[jira] [Updated] (SPARK-47609) CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan
[ https://issues.apache.org/jira/browse/SPARK-47609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47609: - Description: This issue became apparent while bringing my PR [https://github.com/apache/spark/pull/43854] in synch with latest master. Though that PR is meant to do early collapse of projects so that the tree size is kept at minimum when projects keep getting added , in the analyzer phase itself. But as part of the work, the CacheManager lookup also needed to be modified. One of the newly added test in master failed. On analysis of failure it turns out that the cache manager is not picking cached InMemoryRelation for a subplan. This shows up in following existing test org.apache.spark.sql.DatasetCacheSuite {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() {color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color} df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) {color:#00875a}// Verify that df1's cache has stayed the same, since df1's cache already has data{color} // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} {quote} {quote}*{color:#de350b}// This assertion is not right{color}* assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) } {quote} Since df1 exists in the cache as InMemoryRelation, val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df2 is derivable from the cached df1. So when val df2Limit = df2.limit(2), is created, it should utilize the cached df1 . The pull request for the same is [https://github.com/apache/spark/pull/43854|https://github.com/apache/spark/pull/43854] was: This issue became apparent while bringing my PR [https://github.com/apache/spark/pull/43854] in synch with latest master. Though that PR is meant to do early collapse of projects so that the tree size is kept at minimum when projects keep getting added , in the analyzer phase itself. But as part of the work, the CacheManager lookup also needed to be modified. One of the newly added test in master failed. On analysis of failure it turns out that the cache manager is not picking cached InMemoryRelation for a subplan. This shows up in following existing test org.apache.spark.sql.DatasetCacheSuite {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() {color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color} df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) {color:#00875a}// Verify that df1's cache has stayed the same, since df1's cache already has data{color} // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i}{quote} {quote}*{color:#de350b}// This assertion is not right{color}*
[jira] [Updated] (SPARK-47609) CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan
[ https://issues.apache.org/jira/browse/SPARK-47609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47609: - Description: This issue became apparent while bringing my PR [https://github.com/apache/spark/pull/43854] in synch with latest master. Though that PR is meant to do early collapse of projects so that the tree size is kept at minimum when projects keep getting added , in the analyzer phase itself. But as part of the work, the CacheManager lookup also needed to be modified. One of the newly added test in master failed. On analysis of failure it turns out that the cache manager is not picking cached InMemoryRelation for a subplan. This shows up in following existing test org.apache.spark.sql.DatasetCacheSuite {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() {color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color} df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) {color:#00875a}// Verify that df1's cache has stayed the same, since df1's cache already has data{color} // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i}{quote} {quote}*{color:#de350b}// This assertion is not right{color}* assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) } {quote} Since df1 exists in the cache as InMemoryRelation, val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df2 is derivable from the cached df1. So when val df2Limit = df2.limit(2), is created, it should utilize the cached df1 . was: This issue became apparent while bringing my PR [https://github.com/apache/spark/pull/43854] in synch with latest master. Though that PR is meant to do early collapse of projects so that the tree size is kept at minimum when projects keep getting added , in the analyzer phase itself. But as part of the work, the CacheManager lookup also needed to be modified. One of the newly added test in master failed. On analysis of failure it turns out that the cache manager is not picking cached InMemoryRelation for a subplan. This shows up in following existing test {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() {color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color} df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) {color:#00875a}// Verify that df1's cache has stayed the same, since df1's cache already has data{color} // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan }{quote} {quote}*{color:#de350b}// This assertion is not right{color}* assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
[jira] [Created] (SPARK-47609) CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan
Asif created SPARK-47609: Summary: CacheManager Lookup can miss picking InMemoryRelation corresponding to subplan Key: SPARK-47609 URL: https://issues.apache.org/jira/browse/SPARK-47609 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif This issue became apparent while bringing my PR [https://github.com/apache/spark/pull/43854] in synch with latest master. Though that PR is meant to do early collapse of projects so that the tree size is kept at minimum when projects keep getting added , in the analyzer phase itself. But as part of the work, the CacheManager lookup also needed to be modified. One of the newly added test in master failed. On analysis of failure it turns out that the cache manager is not picking cached InMemoryRelation for a subplan. This shows up in following existing test {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() {color:#4c9aff}// After calling collect(), df1's buffer has been loaded.{color} df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) {color:#00875a}// Verify that df1's cache has stayed the same, since df1's cache already has data{color} // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan }{quote} {quote}*{color:#de350b}// This assertion is not right{color}* assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) }{quote} Since df1 exists in the cache as InMemoryRelation, val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df2 is derivable from the cached df1. So when val df2Limit = df2.limit(2), is created, it should utilize the cached df1 . -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
[ https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831116#comment-17831116 ] Asif edited comment on SPARK-26708 at 3/27/24 12:58 AM: I believe the current caching logic is suboptimal and accordingly the bug test for it is testing a suboptimal approach. The bug test for this is {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() // After calling collect(), df1's buffer has been loaded. df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) // Verify that df1's cache has stayed the same, since df1's cache already has data // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst Unknown macro: \{ case i} assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) } {quote} The optimal caching should have resulted in df2LimitInnerPlan actually containing InMemoryTableScanExec which should have corresponded to df1. The reason being that since df1 was already materialized, so it exists in the cache rightly. And df2 is derivable from the cached df1 ( it just has extra projection but otherwise can serve the df2). was (Author: ashahid7): I believe the current caching logic is suboptimal and accordingly the bug test for it is testing a suboptimal approach. The bug test for this is {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() // After calling collect(), df1's buffer has been loaded. df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) // Verify that df1's cache has stayed the same, since df1's cache already has data // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) }{quote} The optimal caching should have resulted in df2LimitInnerPlan actually containing InMemoryTableScanExec which should have corresponded to df1. The reason being that since df2 was already materialized, so it exists in the cache rightly. And df2 is derivable from the cached df1 ( it just has extra projection but otherwise can serve the df2). > Incorrect result caused by inconsistency between a SQL cache's cached RDD and > its physical plan > --- > > Key: SPARK-26708 > URL: https://issues.apache.org/jira/browse/SPARK-26708 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Wei Xue >Priority: Blocker > Labels: correctness > Fix For: 2.4.1, 3.0.0 > > > When
[jira] [Comment Edited] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
[ https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831117#comment-17831117 ] Asif edited comment on SPARK-26708 at 3/27/24 12:54 AM: Towards that please take a look at ticket & PR: https://issues.apache.org/jira/browse/SPARK-45959 and the PR associated with it. Though that PR primarily deals with aggressive collapse of projects at the end of analysis . But it also as part of fix, uses enhanced cached plan lookup and thus results in the above behaviour. was (Author: ashahid7): Towards that please take a look at ticket & PR: [https://issues.apache.org/jira/browse/SPARK-45959|https://issues.apache.org/jira/browse/SPARK-45959] > Incorrect result caused by inconsistency between a SQL cache's cached RDD and > its physical plan > --- > > Key: SPARK-26708 > URL: https://issues.apache.org/jira/browse/SPARK-26708 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Wei Xue >Priority: Blocker > Labels: correctness > Fix For: 2.4.1, 3.0.0 > > > When performing non-cascading cache invalidation, {{recache}} is called on > the other cache entries which are dependent on the cache being invalidated. > It leads to the the physical plans of those cache entries being re-compiled. > For those cache entries, if the cache RDD has already been persisted, chances > are there will be inconsistency between the data and the new plan. It can > cause a correctness issue if the new plan's {{outputPartitioning}} or > {{outputOrdering}} is different from the that of the actual data, and > meanwhile the cache is used by another query that asks for specific > {{outputPartitioning}} or {{outputOrdering}} which happens to match the new > plan but not the actual data. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-26708) Incorrect result caused by inconsistency between a SQL cache's cached RDD and its physical plan
[ https://issues.apache.org/jira/browse/SPARK-26708?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17831116#comment-17831116 ] Asif commented on SPARK-26708: -- I believe the current caching logic is suboptimal and accordingly the bug test for it is testing a suboptimal approach. The bug test for this is {quote}test("SPARK-26708 Cache data and cached plan should stay consistent") { val df = spark.range(0, 5).toDF("a") val df1 = df.withColumn("b", $"a" + 1) val df2 = df.filter($"a" > 1) df.cache() // Add df1 to the CacheManager; the buffer is currently empty. df1.cache() // After calling collect(), df1's buffer has been loaded. df1.collect() // Add df2 to the CacheManager; the buffer is currently empty. df2.cache() // Verify that df1 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df1) val df1InnerPlan = df1.queryExecution.withCachedData .asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan // Verify that df2 is a InMemoryRelation plan with dependency on another cached plan. assertCacheDependency(df2) df.unpersist(blocking = true) // Verify that df1's cache has stayed the same, since df1's cache already has data // before df.unpersist(). val df1Limit = df1.limit(2) val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan) // Verify that df2's cache has been re-cached, with a new physical plan rid of dependency // on df, since df2's cache had not been loaded before df.unpersist(). val df2Limit = df2.limit(2) val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst { case i: InMemoryRelation => i.cacheBuilder.cachedPlan } assert(df2LimitInnerPlan.isDefined && !df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec])) }{quote} The optimal caching should have resulted in df2LimitInnerPlan actually containing InMemoryTableScanExec which should have corresponded to df1. The reason being that since df2 was already materialized, so it exists in the cache rightly. And df2 is derivable from the cached df1 ( it just has extra projection but otherwise can serve the df2). > Incorrect result caused by inconsistency between a SQL cache's cached RDD and > its physical plan > --- > > Key: SPARK-26708 > URL: https://issues.apache.org/jira/browse/SPARK-26708 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.4.0 >Reporter: Xiao Li >Assignee: Wei Xue >Priority: Blocker > Labels: correctness > Fix For: 2.4.1, 3.0.0 > > > When performing non-cascading cache invalidation, {{recache}} is called on > the other cache entries which are dependent on the cache being invalidated. > It leads to the the physical plans of those cache entries being re-compiled. > For those cache entries, if the cache RDD has already been persisted, chances > are there will be inconsistency between the data and the new plan. It can > cause a correctness issue if the new plan's {{outputPartitioning}} or > {{outputOrdering}} is different from the that of the actual data, and > meanwhile the cache is used by another query that asks for specific > {{outputPartitioning}} or {{outputOrdering}} which happens to match the new > plan but not the actual data. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote}val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote}val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. There is an existing test in DataFrameSelfJoinSuite {quote}test("SPARK-28344: fail ambiguous self join - column ref in Project") val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } {quote} Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. Also much of this confusion arises, because join conditions are attempted being resolved on the "un-deduplicated" plan. Attempt to resolve join condition should be made after the deduplication of Join Plan. Which is what the PR for the bug does. was: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example:
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Labels: pull-request-available (was: ) > Datasets involving self joins behave in an inconsistent and unintuitive > manner > > > Key: SPARK-47320 > URL: https://issues.apache.org/jira/browse/SPARK-47320 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > The behaviour of Datasets involving self joins behave in an unintuitive > manner in terms when AnalysisException is thrown due to ambiguity and when it > works. > Found situations where join order swapping causes query to throw Ambiguity > related exceptions which otherwise passes. Some of the Datasets which from > user perspective are un-ambiguous will result in Analysis Exception getting > thrown. > After testing and fixing a bug , I think the issue lies in inconsistency in > determining what constitutes ambiguous and what is un-ambiguous. > There are two ways to look at resolution regarding ambiguity > 1) ExprId of attributes : This is unintuitive approach as spark users do not > bother with the ExprIds > 2) Column Extraction from the Dataset using df(col) api : Which is the user > visible/understandable Point of View. So determining ambiguity should be > based on this. What is Logically unambiguous from users perspective ( > assuming its is logically correct) , should also be the basis of spark > product, to decide on un-ambiguity. > For Example: > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === > df1("a")).select(df1("a")) > {quote} > The above code from perspective #1 should throw ambiguity exception, because > the join condition and projection of df3 dataframe, has df1("a) which has > exprId which matches both df1Joindf2 and df1. > But if we look is from perspective of Dataset used to get column, which is > the intent of the user, the expectation is that df1("a) should be resolved > to Dataset df1 being joined, and not > df1Joindf2. If user intended "a" from df1Joindf2, then would have used > df1Joindf2("a") > So In this case , current spark throws Exception as it is using resolution > based on # 1 > But the below Dataframe by the above logic, should also throw Ambiguity > Exception but it passes > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) > {quote} > The difference in the 2 cases is that in the first case , select is present. > But in the 2nd query, select is not there. > So this implies that in 1st case the df1("a") in projection is causing > ambiguity issue, but same reference in 2nd case, used just in condition, is > considered un-ambiguous. > IMHO , the ambiguity identification criteria should be based totally on #2 > and consistently. > In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of > the tests which are being considered ambiguous ( on # 1 criteria) become > un-ambiguous using (#2) criteria. > There is an existing test in DataFrameSelfJoinSuite > {quote} > test("SPARK-28344: fail ambiguous self join - column ref in Project") > val df1 = spark.range(3) > val df2 = df1.filter($"id" > 0) > Assertion1 : existing > assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) > Assertion2 : added by me > assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) > } > {quote} > Here the Assertion1 passes ( that is ambiguous exception is thrown) > But the Assertion2 fails ( that is no ambiguous exception is thrown) > The only chnage is the join order > Logically both the assertions are invalid ( In the sense both should NOT be > throwing Exception as from the user's perspective there is no ambiguity. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. There is an existing test in DataFrameSelfJoinSuite {quote} test("SPARK-28344: fail ambiguous self join - column ref in Project") val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } {quote} Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. was: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1,
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. for eg: {quote} test("SPARK-28344: fail ambiguous self join - column ref in join condition") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) } } {quote} The above test should not have ambiguity exception thrown as df1("id") and df2("id") are un-ambiguous from perspective of Dataset There is an existing test in DataFrameSelfJoinSuite {quote} test("SPARK-28344: fail ambiguous self join - column ref in Project") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) // Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) // Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } {quote} Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. was: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. for eg: test("SPARK-28344: fail ambiguous self join - column ref in join condition") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) } } {quote} The above test should not have ambiguity exception thrown as df1("id") and df2("id") are un-ambiguous from perspective of Dataset There is an existing test in DataFrameSelfJoinSuite ``` test("SPARK-28344: fail ambiguous self join - column ref in Project") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // `df2("id")` actually points to the column of `df1`. checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 2).map(Row(_))) // Alias the dataframe and use qualified column names can fix ambiguous self-join. val aliasedDf1 = df1.alias("left") val aliasedDf2 = df2.as("right") checkAnswer( aliasedDf1.join(aliasedDf2).select($"right.id"), Seq(1, 1, 1, 2, 2, 2).map(Row(_))) } withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) // Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } } ``` Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. was: The behaviour of
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. for eg: {quote} test("SPARK-28344: fail ambiguous self join - column ref in join condition") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) } } {quote} The above test should not have ambiguity exception thrown as df1("id") and df2("id") are un-ambiguous from perspective of Dataset There is an existing test in DataFrameSelfJoinSuite {quote} test("SPARK-28344: fail ambiguous self join - column ref in Project") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // `df2("id")` actually points to the column of `df1`. checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 2).map(Row(_))) // Alias the dataframe and use qualified column names can fix ambiguous self-join. val aliasedDf1 = df1.alias("left") val aliasedDf2 = df2.as("right") checkAnswer( aliasedDf1.join(aliasedDf2).select($"right.id"), Seq(1, 1, 1, 2, 2, 2).map(Row(_))) } withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) // Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } } {quote} Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. was: The
[jira] [Updated] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47320: - Description: The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. for eg: {quote} test("SPARK-28344: fail ambiguous self join - column ref in join condition") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) } } {quote} The above test should not have ambiguity exception thrown as df1("id") and df2("id") are un-ambiguous from perspective of Dataset There is an existing test in DataFrameSelfJoinSuite ` test("SPARK-28344: fail ambiguous self join - column ref in Project") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // `df2("id")` actually points to the column of `df1`. checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 2).map(Row(_))) // Alias the dataframe and use qualified column names can fix ambiguous self-join. val aliasedDf1 = df1.alias("left") val aliasedDf2 = df2.as("right") checkAnswer( aliasedDf1.join(aliasedDf2).select($"right.id"), Seq(1, 1, 1, 2, 2, 2).map(Row(_))) } withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { // Assertion1 : existing assertAmbiguousSelfJoin(df1.join(df2).select(df2("id"))) // Assertion2 : added by me assertAmbiguousSelfJoin(df2.join(df1).select(df2("id"))) } } ` Here the Assertion1 passes ( that is ambiguous exception is thrown) But the Assertion2 fails ( that is no ambiguous exception is thrown) The only chnage is the join order Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity. was: The behaviour of
[jira] [Commented] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824877#comment-17824877 ] Asif commented on SPARK-47320: -- Opened following PR [https://github.com/apache/spark/pull/45446|https://github.com/apache/spark/pull/45446] > Datasets involving self joins behave in an inconsistent and unintuitive > manner > > > Key: SPARK-47320 > URL: https://issues.apache.org/jira/browse/SPARK-47320 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > The behaviour of Datasets involving self joins behave in an unintuitive > manner in terms when AnalysisException is thrown due to ambiguity and when it > works. > Found situations where join order swapping causes query to throw Ambiguity > related exceptions which otherwise passes. Some of the Datasets which from > user perspective are un-ambiguous will result in Analysis Exception getting > thrown. > After testing and fixing a bug , I think the issue lies in inconsistency in > determining what constitutes ambiguous and what is un-ambiguous. > There are two ways to look at resolution regarding ambiguity > 1) ExprId of attributes : This is unintuitive approach as spark users do not > bother with the ExprIds > 2) Column Extraction from the Dataset using df(col) api : Which is the user > visible/understandable Point of View. So determining ambiguity should be > based on this. What is Logically unambiguous from users perspective ( > assuming its is logically correct) , should also be the basis of spark > product, to decide on un-ambiguity. > For Example: > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === > df1("a")).select(df1("a")) > {quote} > The above code from perspective #1 should throw ambiguity exception, because > the join condition and projection of df3 dataframe, has df1("a) which has > exprId which matches both df1Joindf2 and df1. > But if we look is from perspective of Dataset used to get column, which is > the intent of the user, the expectation is that df1("a) should be resolved > to Dataset df1 being joined, and not > df1Joindf2. If user intended "a" from df1Joindf2, then would have used > df1Joindf2("a") > So In this case , current spark throws Exception as it is using resolution > based on # 1 > But the below Dataframe by the above logic, should also throw Ambiguity > Exception but it passes > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) > {quote} > The difference in the 2 cases is that in the first case , select is present. > But in the 2nd query, select is not there. > So this implies that in 1st case the df1("a") in projection is causing > ambiguity issue, but same reference in 2nd case, used just in condition, is > considered un-ambiguous. > IMHO , the ambiguity identification criteria should be based totally on #2 > and consistently. > In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of > the tests which are being considered ambiguous ( on # 1 criteria) become > un-ambiguous using (#2) criteria. > for eg: > {quote} > test("SPARK-28344: fail ambiguous self join - column ref in join condition") { > val df1 = spark.range(3) > val df2 = df1.filter($"id" > 0) > @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest > with SharedSparkSession { > withSQLConf( > SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", > SQLConf.CROSS_JOINS_ENABLED.key -> "true") { > assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) > } > } > {quote} > The above test should not have ambiguity exception thrown as df1("id") and > df2("id") are un-ambiguous from perspective of Dataset -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
[ https://issues.apache.org/jira/browse/SPARK-47320?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824589#comment-17824589 ] Asif commented on SPARK-47320: -- will be linking the bug to an open PR > Datasets involving self joins behave in an inconsistent and unintuitive > manner > > > Key: SPARK-47320 > URL: https://issues.apache.org/jira/browse/SPARK-47320 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > The behaviour of Datasets involving self joins behave in an unintuitive > manner in terms when AnalysisException is thrown due to ambiguity and when it > works. > Found situations where join order swapping causes query to throw Ambiguity > related exceptions which otherwise passes. Some of the Datasets which from > user perspective are un-ambiguous will result in Analysis Exception getting > thrown. > After testing and fixing a bug , I think the issue lies in inconsistency in > determining what constitutes ambiguous and what is un-ambiguous. > There are two ways to look at resolution regarding ambiguity > 1) ExprId of attributes : This is unintuitive approach as spark users do not > bother with the ExprIds > 2) Column Extraction from the Dataset using df(col) api : Which is the user > visible/understandable Point of View. So determining ambiguity should be > based on this. What is Logically unambiguous from users perspective ( > assuming its is logically correct) , should also be the basis of spark > product, to decide on un-ambiguity. > For Example: > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === > df1("a")).select(df1("a")) > {quote} > The above code from perspective #1 should throw ambiguity exception, because > the join condition and projection of df3 dataframe, has df1("a) which has > exprId which matches both df1Joindf2 and df1. > But if we look is from perspective of Dataset used to get column, which is > the intent of the user, the expectation is that df1("a) should be resolved > to Dataset df1 being joined, and not > df1Joindf2. If user intended "a" from df1Joindf2, then would have used > df1Joindf2("a") > So In this case , current spark throws Exception as it is using resolution > based on # 1 > But the below Dataframe by the above logic, should also throw Ambiguity > Exception but it passes > {quote} > val df1 = Seq((1, 2)).toDF("a", "b") > val df2 = Seq((1, 2)).toDF("aa", "bb") > val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), > df2("aa"), df1("b")) > df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) > {quote} > The difference in the 2 cases is that in the first case , select is present. > But in the 2nd query, select is not there. > So this implies that in 1st case the df1("a") in projection is causing > ambiguity issue, but same reference in 2nd case, used just in condition, is > considered un-ambiguous. > IMHO , the ambiguity identification criteria should be based totally on #2 > and consistently. > In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of > the tests which are being considered ambiguous ( on # 1 criteria) become > un-ambiguous using (#2) criteria. > for eg: > {quote} > test("SPARK-28344: fail ambiguous self join - column ref in join condition") { > val df1 = spark.range(3) > val df2 = df1.filter($"id" > 0) > @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest > with SharedSparkSession { > withSQLConf( > SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", > SQLConf.CROSS_JOINS_ENABLED.key -> "true") { > assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) > } > } > {quote} > The above test should not have ambiguity exception thrown as df1("id") and > df2("id") are un-ambiguous from perspective of Dataset -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47320) Datasets involving self joins behave in an inconsistent and unintuitive manner
Asif created SPARK-47320: Summary: Datasets involving self joins behave in an inconsistent and unintuitive manner Key: SPARK-47320 URL: https://issues.apache.org/jira/browse/SPARK-47320 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif The behaviour of Datasets involving self joins behave in an unintuitive manner in terms when AnalysisException is thrown due to ambiguity and when it works. Found situations where join order swapping causes query to throw Ambiguity related exceptions which otherwise passes. Some of the Datasets which from user perspective are un-ambiguous will result in Analysis Exception getting thrown. After testing and fixing a bug , I think the issue lies in inconsistency in determining what constitutes ambiguous and what is un-ambiguous. There are two ways to look at resolution regarding ambiguity 1) ExprId of attributes : This is unintuitive approach as spark users do not bother with the ExprIds 2) Column Extraction from the Dataset using df(col) api : Which is the user visible/understandable Point of View. So determining ambiguity should be based on this. What is Logically unambiguous from users perspective ( assuming its is logically correct) , should also be the basis of spark product, to decide on un-ambiguity. For Example: {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) val df3 = df1Joindf2.join(df1, df1Joindf2("aa") === df1("a")).select(df1("a")) {quote} The above code from perspective #1 should throw ambiguity exception, because the join condition and projection of df3 dataframe, has df1("a) which has exprId which matches both df1Joindf2 and df1. But if we look is from perspective of Dataset used to get column, which is the intent of the user, the expectation is that df1("a) should be resolved to Dataset df1 being joined, and not df1Joindf2. If user intended "a" from df1Joindf2, then would have used df1Joindf2("a") So In this case , current spark throws Exception as it is using resolution based on # 1 But the below Dataframe by the above logic, should also throw Ambiguity Exception but it passes {quote} val df1 = Seq((1, 2)).toDF("a", "b") val df2 = Seq((1, 2)).toDF("aa", "bb") val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"), df2("aa"), df1("b")) df1Joindf2.join(df1, df1Joindf2("a") === df1("a")) {quote} The difference in the 2 cases is that in the first case , select is present. But in the 2nd query, select is not there. So this implies that in 1st case the df1("a") in projection is causing ambiguity issue, but same reference in 2nd case, used just in condition, is considered un-ambiguous. IMHO , the ambiguity identification criteria should be based totally on #2 and consistently. In the DataFrameJoinTest and DataFrameSelfJoinTest, if we go by #2, some of the tests which are being considered ambiguous ( on # 1 criteria) become un-ambiguous using (#2) criteria. for eg: {quote} test("SPARK-28344: fail ambiguous self join - column ref in join condition") { val df1 = spark.range(3) val df2 = df1.filter($"id" > 0) @@ -118,29 +139,32 @@ class DataFrameSelfJoinSuite extends QueryTest with SharedSparkSession { withSQLConf( SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true", SQLConf.CROSS_JOINS_ENABLED.key -> "true") { assertAmbiguousSelfJoin(df1.join(df2, df1("id") > df2("id"))) } } {quote} The above test should not have ambiguity exception thrown as df1("id") and df2("id") are un-ambiguous from perspective of Dataset -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824102#comment-17824102 ] Asif commented on SPARK-39441: -- this issue should be resolved by the PR for ticket [https://issues.apache.org/jira/browse/SPARK-45959|https://issues.apache.org/jira/browse/SPARK-45959] > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-39441) Speed up DeduplicateRelations
[ https://issues.apache.org/jira/browse/SPARK-39441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17824102#comment-17824102 ] Asif edited comment on SPARK-39441 at 3/6/24 5:33 PM: -- this issue should be resolved by the PR for ticket https://issues.apache.org/jira/browse/SPARK-45959, but it is still in the open state was (Author: ashahid7): this issue should be resolved by the PR for ticket [https://issues.apache.org/jira/browse/SPARK-45959|https://issues.apache.org/jira/browse/SPARK-45959] > Speed up DeduplicateRelations > - > > Key: SPARK-39441 > URL: https://issues.apache.org/jira/browse/SPARK-39441 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Allison Wang >Assignee: Allison Wang >Priority: Major > Fix For: 3.4.0 > > > Speed up the Analyzer rule DeduplicateRelations -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823510#comment-17823510 ] Asif edited comment on SPARK-33152 at 3/5/24 6:43 PM: -- [~tedjenks] .. Unfortunately I am not a committer. As part of workday , I had opened this Jira and opened a PR to fix this issue completely which required a different logic. The changes are extensive and they were never reviewed or dicussed by OS community. This PR has been in production since past 3 years at Workday. As to why a check is not added, etc,.,: That would be unclean and as such is not easy to implement also in current codebase, because it will result in various other issues like new redundant filters being inferred and other messy bugs as the constraint code is sensitive to constraints coming from each node below and the constraints available at current node, to decide whether to create new filters or not. Constrainst are created per operator node ( project, filter etc) and arbitrary putting a limit on constraints at a given operator , will impact the new filters being created. was (Author: ashahid7): [~tedjenks] .. Unfortunately I am not a committer. As part of workday , I had opened this Jira and opened a PR to fix this issue completely which required a different logic. The changes are extensive and they were never reviewed or dicussed by OS community. This PR has been in production since past 3 years at Workday. As to why a check is not added, etc,.,: That would be unclean and as such is not easy to implement also in current codebase, because it will result in various other issues like new/wrong filters being inferred and other messy bugs as the constraint code is sensitive to constraints coming from each node below and the constraints available at current node, to decide whether to create new filters or not. Constrainst are created per operator node ( project, filter etc) and arbitrary putting a limit on constraints at a given operator , will impact the new filters being created. > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code
[jira] [Commented] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823512#comment-17823512 ] Asif commented on SPARK-33152: -- other than using my PR, the safe option would be to disable constraint propagation rule via sql conf. though that would mean loosing optimizations related to push down of new filters on the other side of join legs etc, > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code incorrectly generated a > EqualsNullSafeConstraint instead of EqualTo constraint, when using the > existing Constraints code. With these changes, the test correctly creates an > EqualTo constraint, resulting in an inferred IsNotNull constraint > # It does away with the current combinatorial logic of evaluation all the > constraints can cause compilation to run into hours or cause OOM. The number > of constraints stored is exactly the same as the number of filters encountered > h2. Q2. What problem is this proposal NOT designed to solve? > It mainly focuses on compile time performance, but in some cases can benefit > run time characteristics too, like inferring IsNotNull filter or pushing down > compound predicates on the join, which currently may get missed/ does not > happen , respectively, by the present code. > h2. Q3. How is it done today, and what are the limits of current practice? > Current ConstraintsPropagation code, pessimistically tries to generates all > the possible combinations of constraints , based on the aliases ( even then > it may miss a lot of combinations if the expression is a complex expression > involving same attribute repeated multiple times within the expression and > there are many aliases to that column). There are query plans in our > production env, which can result in intermediate number of constraints going > into hundreds of thousands, causing OOM or taking time running into hours. > Also there are cases where it incorrectly generates an EqualNullSafe > constraint instead of EqualTo constraint , thus missing a possible IsNull > constraint on column. > Also it only pushes single column predicate on the other
[jira] [Commented] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823510#comment-17823510 ] Asif commented on SPARK-33152: -- [~tedjenks] .. Unfortunately I am not a committer. As part of workday , I had opened this Jira and opened a PR to fix this issue completely which required a different logic. The changes are extensive and they were never reviewed or dicussed by OS community. This PR has been in production since past 3 years at Workday. As to why a check is not added, etc,.,: That would be unclean and as such is not easy to implement also in current codebase, because it will result in various other issues like new/wrong filters being inferred and other messy bugs as the constraint code is sensitive to constraints coming from each node below and the constraints available at current node, to decide whether to create new filters or not. Constrainst are created per operator node ( project, filter etc) and arbitrary putting a limit on constraints at a given operator , will impact the new filters being created. > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code incorrectly generated a > EqualsNullSafeConstraint instead of EqualTo constraint, when using the > existing Constraints code. With these changes, the test correctly creates an > EqualTo constraint, resulting in an inferred IsNotNull constraint > # It does away with the current combinatorial logic of evaluation all the > constraints can cause compilation to run into hours or cause OOM. The number > of constraints stored is exactly the same as the number of filters encountered > h2. Q2. What problem is this proposal NOT designed to solve? > It mainly focuses on compile time performance, but in some cases can benefit > run time characteristics too, like inferring IsNotNull filter or pushing down > compound predicates on the join, which currently may get missed/ does not > happen , respectively, by the present code. > h2. Q3. How is it done today, and what are the limits of current practice? > Current ConstraintsPropagation code, pessimistically tries to generates all > the
[jira] [Commented] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17823344#comment-17823344 ] Asif commented on SPARK-33152: -- [~tedjenks] The issue has always been there because of the way constraint prop rule works ( due to it permutational logic). A possible cause why it might have become more common could be due to some changes to fix the previously undetected constraints . The more robust the code becomes in detecting the constraints, chances are it would increase the cost of over all constraints code drastically. For eg if we have a projection with multiple aliases, and say these aliases are used in case when expressions and involve functions taking these aliases, the number of constraints created would be enormous and even then the code ( atleast in 3.2) would not be able to cover all the possible constraints.. so my guess is that any changes to increase the sensitivity of constraints identification will affect the cost of the evaluation of constraints.. > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code incorrectly generated a > EqualsNullSafeConstraint instead of EqualTo constraint, when using the > existing Constraints code. With these changes, the test correctly creates an > EqualTo constraint, resulting in an inferred IsNotNull constraint > # It does away with the current combinatorial logic of evaluation all the > constraints can cause compilation to run into hours or cause OOM. The number > of constraints stored is exactly the same as the number of filters encountered > h2. Q2. What problem is this proposal NOT designed to solve? > It mainly focuses on compile time performance, but in some cases can benefit > run time characteristics too, like inferring IsNotNull filter or pushing down > compound predicates on the join, which currently may get missed/ does not > happen , respectively, by the present code. > h2. Q3. How is it done today, and what are the limits of current practice? > Current ConstraintsPropagation code, pessimistically tries to generates all > the possible combinations of constraints , based on
[jira] [Updated] (SPARK-47217) De-duplication of Relations in Joins, can result in plan resolution failure
[ https://issues.apache.org/jira/browse/SPARK-47217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47217: - Description: In case of some flavours of nested joins involving repetition of relation, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is {noformat} Project ( dataframe A.column("col-a") ) | Join2 || Join1 DataFrame A | DataFrame ADataFrame B {noformat} In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. was: In case of some flavours of self join queries or nested joins involving repetition of relation, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is {noformat} Project ( dataframe A.column("col-a") ) | Join2 || Join1 DataFrame A | DataFrame ADataFrame B {noformat} In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. > De-duplication of Relations in Joins, can result in plan resolution failure > --- > > Key: SPARK-47217 > URL: https://issues.apache.org/jira/browse/SPARK-47217 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: Spark-SQL > > In case of some flavours of nested joins involving repetition of relation, > the projected columns when passed to the DataFrame.select API , as form of > df.column , can result in plan resolution failure due to attribute resolution > not happening. > A scenario in which this happens is > {noformat} > > Project ( dataframe A.column("col-a") ) > | > Join2 > || >Join1 DataFrame A > | > DataFrame ADataFrame B > {noformat} > In such cases, If it so happens that Join2 - right leg DataFrame A gets > re-aliased due to De-Duplication of relations, and if the project uses Column > definition obtained from DataFrame A, its exprId will not match the > re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47217) De-duplication of Relations in Joins, can result in plan resolution failure
[ https://issues.apache.org/jira/browse/SPARK-47217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47217: - Description: In case of some flavours of self join queries or nested joins involving repetition of relation, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is {noformat} Project ( dataframe A.column("col-a") ) | Join2 || Join1 DataFrame A | DataFrame ADataFrame B {noformat} In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. was: In case of some flavours of nested self join queries, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is {noformat} Project ( dataframe A.column("col-a") ) | Join2 || Join1 DataFrame A | DataFrame ADataFrame B {noformat} In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. > De-duplication of Relations in Joins, can result in plan resolution failure > --- > > Key: SPARK-47217 > URL: https://issues.apache.org/jira/browse/SPARK-47217 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: Spark-SQL > > In case of some flavours of self join queries or nested joins involving > repetition of relation, the projected columns when passed to the > DataFrame.select API , as form of df.column , can result in plan resolution > failure due to attribute resolution not happening. > A scenario in which this happens is > {noformat} > > Project ( dataframe A.column("col-a") ) > | > Join2 > || >Join1 DataFrame A > | > DataFrame ADataFrame B > {noformat} > In such cases, If it so happens that Join2 - right leg DataFrame A gets > re-aliased due to De-Duplication of relations, and if the project uses Column > definition obtained from DataFrame A, its exprId will not match the > re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-47217) De-duplication of Relations in Joins, can result in plan resolution failure
[ https://issues.apache.org/jira/browse/SPARK-47217?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-47217: - Description: In case of some flavours of nested self join queries, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is {noformat} Project ( dataframe A.column("col-a") ) | Join2 || Join1 DataFrame A | DataFrame ADataFrame B {noformat} In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. was: In case of some flavours of nested self join queries, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is Project ( dataframe A.column("col-a") ) | Join2 |DataFrame A Join1 | DataFrame ADataFrame B In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. > De-duplication of Relations in Joins, can result in plan resolution failure > --- > > Key: SPARK-47217 > URL: https://issues.apache.org/jira/browse/SPARK-47217 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: Spark-SQL > > In case of some flavours of nested self join queries, the projected columns > when passed to the DataFrame.select API , as form of df.column , can result > in plan resolution failure due to attribute resolution not happening. > A scenario in which this happens is > > {noformat} > > Project ( dataframe A.column("col-a") ) > | > Join2 > || >Join1 DataFrame A > | > DataFrame ADataFrame B > {noformat} > In such cases, If it so happens that Join2 - right leg DataFrame A gets > re-aliased due to De-Duplication of relations, and if the project uses > Column definition obtained from DataFrame A, its exprId will not match the > re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-47217) De-duplication of Relations in Joins, can result in plan resolution failure
Asif created SPARK-47217: Summary: De-duplication of Relations in Joins, can result in plan resolution failure Key: SPARK-47217 URL: https://issues.apache.org/jira/browse/SPARK-47217 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif In case of some flavours of nested self join queries, the projected columns when passed to the DataFrame.select API , as form of df.column , can result in plan resolution failure due to attribute resolution not happening. A scenario in which this happens is Project ( dataframe A.column("col-a") ) | Join2 |DataFrame A Join1 | DataFrame ADataFrame B In such cases, If it so happens that Join2 - right leg DataFrame A gets re-aliased due to De-Duplication of relations, and if the project uses Column definition obtained from DataFrame A, its exprId will not match the re-aliased Join2 - right Leg- DataFrame A , causing resolution failure. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
[ https://issues.apache.org/jira/browse/SPARK-46671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-46671: - Description: while bring my old PR which uses a different approach to the ConstraintPropagation algorithm ( [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch with current master, I noticed a test failure in my branch for SPARK-33152: The test which is failing is InferFiltersFromConstraintSuite: {code} test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: Infer Filters") { val x = testRelation.as("x") val y = testRelation.as("y") val z = testRelation.as("z") // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) // Once strategy's idempotence is not broken val originalQuery = x.join(y, condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z, condition = Some($"xy.a" === $"z.a")).analyze val correctAnswer = x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z.where($"a".isNotNull), condition = Some($"xy.a" === $"z.a")).analyze val optimizedQuery = InferFiltersFromConstraints(originalQuery) comparePlans(optimizedQuery, correctAnswer) comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) } {code} In the above test, I believe the below assertion is not proper. There is a redundant filter which is getting created. Out of these two isNotNull constraints, only one should be created. $"xa".isNotNull && $"x.a".isNotNull Because "xa" is an alias of x."a" , so only one isNullConstraint is needed. // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) This is not a big issue, but it highlights the need to take a relook at the code of ConstraintPropagation and related code. I am filing this jira so that constraint code can be tightened/made more robust. was: while bring my old PR which uses a different approach to the ConstraintPropagation algorithm ( [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch with current master, I noticed a test failure in my branch for SPARK-33152: The test which is failing is InferFiltersFromConstraintSuite: {code} test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: Infer Filters") { val x = testRelation.as("x") val y = testRelation.as("y") val z = testRelation.as("z") // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) // Once strategy's idempotence is not broken val originalQuery = x.join(y, condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z, condition = Some($"xy.a" === $"z.a")).analyze val correctAnswer = x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z.where($"a".isNotNull), condition = Some($"xy.a" === $"z.a")).analyze val optimizedQuery = InferFiltersFromConstraints(originalQuery) comparePlans(optimizedQuery, correctAnswer) comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) } {code} In the above test, I believe the below assertion is not proper. There is a redundant filter which is getting created. Out of these two isNotNull constraints, only one should be created. $"xa".isNotNull && $"x.a".isNotNull Because presence of (xa#0 = a#0), automatically implies that is one attribute is not null, the other also has to be not null. // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) This is not a big issue, but it highlights the
[jira] [Reopened] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
[ https://issues.apache.org/jira/browse/SPARK-46671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif reopened SPARK-46671: -- After further analysis , I believe , that what I said originally in the ticket is valid and that the code Does create a redundant constraint. The reason is "xa" is an alias of "a", so there should be a IsNotNull constraint on only one of the attribute and not both. > InferFiltersFromConstraint rule is creating a redundant filter > -- > > Key: SPARK-46671 > URL: https://issues.apache.org/jira/browse/SPARK-46671 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: SQL, catalyst > > while bring my old PR which uses a different approach to the > ConstraintPropagation algorithm ( > [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch > with current master, I noticed a test failure in my branch for SPARK-33152: > The test which is failing is > InferFiltersFromConstraintSuite: > {code} > test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: > Infer Filters") { > val x = testRelation.as("x") > val y = testRelation.as("y") > val z = testRelation.as("z") > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > // Once strategy's idempotence is not broken > val originalQuery = > x.join(y, condition = Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z, condition = Some($"xy.a" === $"z.a")).analyze > val correctAnswer = > x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = > Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z.where($"a".isNotNull), condition = Some($"xy.a" === > $"z.a")).analyze > val optimizedQuery = InferFiltersFromConstraints(originalQuery) > comparePlans(optimizedQuery, correctAnswer) > comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) > } > {code} > In the above test, I believe the below assertion is not proper. > There is a redundant filter which is getting created. > Out of these two isNotNull constraints, only one should be created. > $"xa".isNotNull && $"x.a".isNotNull > Because presence of (xa#0 = a#0), automatically implies that is one > attribute is not null, the other also has to be not null. > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > This is not a big issue, but it highlights the need to take a relook at the > code of ConstraintPropagation and related code. > I am filing this jira so that constraint code can be tightened/made more > robust. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
[ https://issues.apache.org/jira/browse/SPARK-46671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif resolved SPARK-46671. -- Resolution: Not A Bug > InferFiltersFromConstraint rule is creating a redundant filter > -- > > Key: SPARK-46671 > URL: https://issues.apache.org/jira/browse/SPARK-46671 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: SQL, catalyst > > while bring my old PR which uses a different approach to the > ConstraintPropagation algorithm ( > [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch > with current master, I noticed a test failure in my branch for SPARK-33152: > The test which is failing is > InferFiltersFromConstraintSuite: > {code} > test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: > Infer Filters") { > val x = testRelation.as("x") > val y = testRelation.as("y") > val z = testRelation.as("z") > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > // Once strategy's idempotence is not broken > val originalQuery = > x.join(y, condition = Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z, condition = Some($"xy.a" === $"z.a")).analyze > val correctAnswer = > x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = > Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z.where($"a".isNotNull), condition = Some($"xy.a" === > $"z.a")).analyze > val optimizedQuery = InferFiltersFromConstraints(originalQuery) > comparePlans(optimizedQuery, correctAnswer) > comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) > } > {code} > In the above test, I believe the below assertion is not proper. > There is a redundant filter which is getting created. > Out of these two isNotNull constraints, only one should be created. > $"xa".isNotNull && $"x.a".isNotNull > Because presence of (xa#0 = a#0), automatically implies that is one > attribute is not null, the other also has to be not null. > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > This is not a big issue, but it highlights the need to take a relook at the > code of ConstraintPropagation and related code. > I am filing this jira so that constraint code can be tightened/made more > robust. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
[ https://issues.apache.org/jira/browse/SPARK-46671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805434#comment-17805434 ] Asif commented on SPARK-46671: -- on further thoughts , I am wrong.. There should be 2 separate isNotNull constraints.. > InferFiltersFromConstraint rule is creating a redundant filter > -- > > Key: SPARK-46671 > URL: https://issues.apache.org/jira/browse/SPARK-46671 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: SQL, catalyst > > while bring my old PR which uses a different approach to the > ConstraintPropagation algorithm ( > [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch > with current master, I noticed a test failure in my branch for SPARK-33152: > The test which is failing is > InferFiltersFromConstraintSuite: > {code} > test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: > Infer Filters") { > val x = testRelation.as("x") > val y = testRelation.as("y") > val z = testRelation.as("z") > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > // Once strategy's idempotence is not broken > val originalQuery = > x.join(y, condition = Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z, condition = Some($"xy.a" === $"z.a")).analyze > val correctAnswer = > x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = > Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z.where($"a".isNotNull), condition = Some($"xy.a" === > $"z.a")).analyze > val optimizedQuery = InferFiltersFromConstraints(originalQuery) > comparePlans(optimizedQuery, correctAnswer) > comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) > } > {code} > In the above test, I believe the below assertion is not proper. > There is a redundant filter which is getting created. > Out of these two isNotNull constraints, only one should be created. > $"xa".isNotNull && $"x.a".isNotNull > Because presence of (xa#0 = a#0), automatically implies that is one > attribute is not null, the other also has to be not null. > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > This is not a big issue, but it highlights the need to take a relook at the > code of ConstraintPropagation and related code. > I am filing this jira so that constraint code can be tightened/made more > robust. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
[ https://issues.apache.org/jira/browse/SPARK-46671?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17805435#comment-17805435 ] Asif commented on SPARK-46671: -- so closing the ticket > InferFiltersFromConstraint rule is creating a redundant filter > -- > > Key: SPARK-46671 > URL: https://issues.apache.org/jira/browse/SPARK-46671 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: SQL, catalyst > > while bring my old PR which uses a different approach to the > ConstraintPropagation algorithm ( > [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch > with current master, I noticed a test failure in my branch for SPARK-33152: > The test which is failing is > InferFiltersFromConstraintSuite: > {code} > test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: > Infer Filters") { > val x = testRelation.as("x") > val y = testRelation.as("y") > val z = testRelation.as("z") > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > // Once strategy's idempotence is not broken > val originalQuery = > x.join(y, condition = Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z, condition = Some($"xy.a" === $"z.a")).analyze > val correctAnswer = > x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = > Some($"x.a" === $"y.a")) > .select($"x.a", $"x.a".as("xa")).as("xy") > .join(z.where($"a".isNotNull), condition = Some($"xy.a" === > $"z.a")).analyze > val optimizedQuery = InferFiltersFromConstraints(originalQuery) > comparePlans(optimizedQuery, correctAnswer) > comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) > } > {code} > In the above test, I believe the below assertion is not proper. > There is a redundant filter which is getting created. > Out of these two isNotNull constraints, only one should be created. > $"xa".isNotNull && $"x.a".isNotNull > Because presence of (xa#0 = a#0), automatically implies that is one > attribute is not null, the other also has to be not null. > // Removes EqualNullSafe when constructing candidate constraints > comparePlans( > InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) > .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), > x.select($"x.a", $"x.a".as("xa")) > .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && > $"xa" === $"x.a").analyze) > This is not a big issue, but it highlights the need to take a relook at the > code of ConstraintPropagation and related code. > I am filing this jira so that constraint code can be tightened/made more > robust. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-46671) InferFiltersFromConstraint rule is creating a redundant filter
Asif created SPARK-46671: Summary: InferFiltersFromConstraint rule is creating a redundant filter Key: SPARK-46671 URL: https://issues.apache.org/jira/browse/SPARK-46671 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0 Reporter: Asif while bring my old PR which uses a different approach to the ConstraintPropagation algorithm ( [SPARK-33152|https://issues.apache.org/jira/browse/SPARK-33152]) in synch with current master, I noticed a test failure in my branch for SPARK-33152: The test which is failing is InferFiltersFromConstraintSuite: {code} test("SPARK-43095: Avoid Once strategy's idempotence is broken for batch: Infer Filters") { val x = testRelation.as("x") val y = testRelation.as("y") val z = testRelation.as("z") // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) // Once strategy's idempotence is not broken val originalQuery = x.join(y, condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z, condition = Some($"xy.a" === $"z.a")).analyze val correctAnswer = x.where($"a".isNotNull).join(y.where($"a".isNotNull), condition = Some($"x.a" === $"y.a")) .select($"x.a", $"x.a".as("xa")).as("xy") .join(z.where($"a".isNotNull), condition = Some($"xy.a" === $"z.a")).analyze val optimizedQuery = InferFiltersFromConstraints(originalQuery) comparePlans(optimizedQuery, correctAnswer) comparePlans(InferFiltersFromConstraints(optimizedQuery), correctAnswer) } {code} In the above test, I believe the below assertion is not proper. There is a redundant filter which is getting created. Out of these two isNotNull constraints, only one should be created. $"xa".isNotNull && $"x.a".isNotNull Because presence of (xa#0 = a#0), automatically implies that is one attribute is not null, the other also has to be not null. // Removes EqualNullSafe when constructing candidate constraints comparePlans( InferFiltersFromConstraints(x.select($"x.a", $"x.a".as("xa")) .where($"xa" <=> $"x.a" && $"xa" === $"x.a").analyze), x.select($"x.a", $"x.a".as("xa")) .where($"xa".isNotNull && $"x.a".isNotNull && $"xa" <=> $"x.a" && $"xa" === $"x.a").analyze) This is not a big issue, but it highlights the need to take a relook at the code of ConstraintPropagation and related code. I am filing this jira so that constraint code can be tightened/made more robust. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45959) SPIP: Abusing DataSet.withColumn can cause huge tree with severe perf degradation
[ https://issues.apache.org/jira/browse/SPARK-45959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45959: - Description: Though documentation clearly recommends to add all columns in a single shot, but in reality is difficult to expect customer to modify their code, as in spark2 the rules in analyzer were such that they did not do deep tree traversal. Moreover in Spark3 , the plans are cloned before giving to analyzer , optimizer etc which was not the case in Spark2. All these things have resulted in query time being increased from 5 min to 2 - 3 hrs. Many times the columns are added to plan via some for loop logic which just keeps adding new computation based on some rule. So, my suggestion is to Collapse the Projects early, once the analysis of the logical plan is done, but before the plan gets assigned to the field variable in QueryExecution. The PR for the above is ready for review. The major change is in the way the lookup is performed in CacheManager. I have described the logic in the PR and have added multiple tests. was: Though documentation clearly recommends to add all columns in a single shot, but in reality is difficult to expect customer to modify their code, as in spark2 the rules in analyzer were such that they did not do deep tree traversal. Moreover in Spark3 , the plans are cloned before giving to analyzer , optimizer etc which was not the case in Spark2. All these things have resulted in query time being increased from 5 min to 2 - 3 hrs. Many times the columns are added to plan via some for loop logic which just keeps adding new computation based on some rule. So, my suggestion is to do some intial check in the withColumn api, before creating a new projection, like if all the existing columns are still being projected, and the new column being added has an expression which is not depending on the output of the top node , but its child, then instead of adding a new project, the column can be added to the existing node. For starts, may be we can just handle Project node .. > SPIP: Abusing DataSet.withColumn can cause huge tree with severe perf > degradation > - > > Key: SPARK-45959 > URL: https://issues.apache.org/jira/browse/SPARK-45959 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > > Though documentation clearly recommends to add all columns in a single shot, > but in reality is difficult to expect customer to modify their code, as in > spark2 the rules in analyzer were such that they did not do deep tree > traversal. Moreover in Spark3 , the plans are cloned before giving to > analyzer , optimizer etc which was not the case in Spark2. > All these things have resulted in query time being increased from 5 min to 2 > - 3 hrs. > Many times the columns are added to plan via some for loop logic which just > keeps adding new computation based on some rule. > So, my suggestion is to Collapse the Projects early, once the analysis of > the logical plan is done, but before the plan gets assigned to the field > variable in QueryExecution. > The PR for the above is ready for review. > The major change is in the way the lookup is performed in CacheManager. > I have described the logic in the PR and have added multiple tests. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45959) SPIP: Abusing DataSet.withColumn can cause huge tree with severe perf degradation
[ https://issues.apache.org/jira/browse/SPARK-45959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45959: - Summary: SPIP: Abusing DataSet.withColumn can cause huge tree with severe perf degradation (was: Abusing DataSet.withColumn can cause huge tree with severe perf degradation) > SPIP: Abusing DataSet.withColumn can cause huge tree with severe perf > degradation > - > > Key: SPARK-45959 > URL: https://issues.apache.org/jira/browse/SPARK-45959 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > > Though documentation clearly recommends to add all columns in a single shot, > but in reality is difficult to expect customer to modify their code, as in > spark2 the rules in analyzer were such that they did not do deep tree > traversal. Moreover in Spark3 , the plans are cloned before giving to > analyzer , optimizer etc which was not the case in Spark2. > All these things have resulted in query time being increased from 5 min to 2 > - 3 hrs. > Many times the columns are added to plan via some for loop logic which just > keeps adding new computation based on some rule. > So, my suggestion is to do some intial check in the withColumn api, before > creating a new projection, like if all the existing columns are still being > projected, and the new column being added has an expression which is not > depending on the output of the top node , but its child, then instead of > adding a new project, the column can be added to the existing node. > For starts, may be we can just handle Project node .. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45959) Abusing DataSet.withColumn can cause huge tree with severe perf degradation
[ https://issues.apache.org/jira/browse/SPARK-45959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45959: - Priority: Minor (was: Major) > Abusing DataSet.withColumn can cause huge tree with severe perf degradation > --- > > Key: SPARK-45959 > URL: https://issues.apache.org/jira/browse/SPARK-45959 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > > Though documentation clearly recommends to add all columns in a single shot, > but in reality is difficult to expect customer to modify their code, as in > spark2 the rules in analyzer were such that they did not do deep tree > traversal. Moreover in Spark3 , the plans are cloned before giving to > analyzer , optimizer etc which was not the case in Spark2. > All these things have resulted in query time being increased from 5 min to 2 > - 3 hrs. > Many times the columns are added to plan via some for loop logic which just > keeps adding new computation based on some rule. > So, my suggestion is to do some intial check in the withColumn api, before > creating a new projection, like if all the existing columns are still being > projected, and the new column being added has an expression which is not > depending on the output of the top node , but its child, then instead of > adding a new project, the column can be added to the existing node. > For starts, may be we can just handle Project node .. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45959) Abusing DataSet.withColumn can cause huge tree with severe perf degradation
[ https://issues.apache.org/jira/browse/SPARK-45959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786941#comment-17786941 ] Asif commented on SPARK-45959: -- will create a PR for the same.. > Abusing DataSet.withColumn can cause huge tree with severe perf degradation > --- > > Key: SPARK-45959 > URL: https://issues.apache.org/jira/browse/SPARK-45959 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > Though documentation clearly recommends to add all columns in a single shot, > but in reality is difficult to expect customer to modify their code, as in > spark2 the rules in analyzer were such that they did not do deep tree > traversal. Moreover in Spark3 , the plans are cloned before giving to > analyzer , optimizer etc which was not the case in Spark2. > All these things have resulted in query time being increased from 5 min to 2 > - 3 hrs. > Many times the columns are added to plan via some for loop logic which just > keeps adding new computation based on some rule. > So, my suggestion is to do some intial check in the withColumn api, before > creating a new projection, like if all the existing columns are still being > projected, and the new column being added has an expression which is not > depending on the output of the top node , but its child, then instead of > adding a new project, the column can be added to the existing node. > For starts, may be we can just handle Project node .. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45959) Abusing DataSet.withColumn can cause huge tree with severe perf degradation
Asif created SPARK-45959: Summary: Abusing DataSet.withColumn can cause huge tree with severe perf degradation Key: SPARK-45959 URL: https://issues.apache.org/jira/browse/SPARK-45959 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.1 Reporter: Asif Though documentation clearly recommends to add all columns in a single shot, but in reality is difficult to expect customer to modify their code, as in spark2 the rules in analyzer were such that they did not do deep tree traversal. Moreover in Spark3 , the plans are cloned before giving to analyzer , optimizer etc which was not the case in Spark2. All these things have resulted in query time being increased from 5 min to 2 - 3 hrs. Many times the columns are added to plan via some for loop logic which just keeps adding new computation based on some rule. So, my suggestion is to do some intial check in the withColumn api, before creating a new projection, like if all the existing columns are still being projected, and the new column being added has an expression which is not depending on the output of the top node , but its child, then instead of adding a new project, the column can be added to the existing node. For starts, may be we can just handle Project node .. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45943) DataSourceV2Relation.computeStats throws IllegalStateException in test mode
[ https://issues.apache.org/jira/browse/SPARK-45943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786652#comment-17786652 ] Asif commented on SPARK-45943: -- thanks [~wforget] for the input.. if you have solution pls open PR, else I can give a shot. > DataSourceV2Relation.computeStats throws IllegalStateException in test mode > --- > > Key: SPARK-45943 > URL: https://issues.apache.org/jira/browse/SPARK-45943 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > This issue surfaces when the new unit test of PR > SPARK-45866|https://github.com/apache/spark/pull/43824] is added -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45866) Reuse of exchange in AQE does not happen when run time filters are pushed down to the underlying Scan ( like iceberg )
[ https://issues.apache.org/jira/browse/SPARK-45866?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45866: - Labels: pull-request-available (was: ) > Reuse of exchange in AQE does not happen when run time filters are pushed > down to the underlying Scan ( like iceberg ) > -- > > Key: SPARK-45866 > URL: https://issues.apache.org/jira/browse/SPARK-45866 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > In certain types of queries for eg TPCDS Query 14b, the reuse of exchange > does not happen in AQE , resulting in perf degradation. > The spark TPCDS tests are unable to catch the problem, because the > InMemoryScan used for testing do not implement the equals & hashCode > correctly , in the sense , that they do take into account the pushed down run > time filters. > In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the > equality check , apart from other things, also involves Runtime Filters > pushed ( which is correct). > In spark the issue is this: > For a given stage being materialized, just before materialization starts, > the run time filters are confined to the BatchScanExec level. > Only when the actual RDD corresponding to the BatchScanExec, is being > evaluated, do the runtime filters get pushed to the underlying Scan. > Now if a new stage is created and it checks in the stageCache using its > canonicalized plan to see if a stage can be reused, it fails to find the > r-usable stage even if the stage exists, because the canonicalized spark > plan present in the stage cache, has now the run time filters pushed to the > Scan , so the incoming canonicalized spark plan does not match the key as > their underlying scans differ . that is incoming spark plan's scan does not > have runtime filters , while the canonicalized spark plan present as key in > the stage cache has the scan with runtime filters pushed. > The fix as I have worked is to provide, two methods in the > SupportsRuntimeV2Filtering interface , > default boolean equalToIgnoreRuntimeFilters(Scan other) { > return this.equals(other); > } > default int hashCodeIgnoreRuntimeFilters() { > return this.hashCode(); > } > In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then > instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters > And the underlying Scan implementations should provide equality which > excludes run time filters. > Similarly the hashCode of BatchScanExec, should use > scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode). > Will be creating a PR with bug test for review. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45943) DataSourceV2Relation.computeStats throws IllegalStateException in test mode
Asif created SPARK-45943: Summary: DataSourceV2Relation.computeStats throws IllegalStateException in test mode Key: SPARK-45943 URL: https://issues.apache.org/jira/browse/SPARK-45943 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif This issue surfaces when the new unit test of PR SPARK-45866|https://github.com/apache/spark/pull/43824] is added -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-45924) Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec
[ https://issues.apache.org/jira/browse/SPARK-45924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif closed SPARK-45924. this is not a bug > Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not > equivalent with SubqueryBroadcastExec > > > Key: SPARK-45924 > URL: https://issues.apache.org/jira/browse/SPARK-45924 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > while writing bug test for > [SPARK-45866|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-45866], > found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in > the sense that buildPlan : LogicalPlan is not canonicalized which causes > batchscans to differ when reuse of exchange check happens in AQE. > Moreover the equivalence of SubqueryAdaptiveBroadcastExec and > SubqueryBroadcastExec is not there which also aggravates the re-use of > exchange in aqe broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Closed] (SPARK-45925) SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec causing re-use of exchange not happening in AQE
[ https://issues.apache.org/jira/browse/SPARK-45925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif closed SPARK-45925. this is not an issue > SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec > causing re-use of exchange not happening in AQE > -- > > Key: SPARK-45925 > URL: https://issues.apache.org/jira/browse/SPARK-45925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > A created stage may contain SubqueryAdaptiveBroadcastExec while incominng > exchange may contain SubqueryBroadcastExec and though they are equivalent , > the match does not happen because equals/hashCode do not match , resulting in > non re-use of exchange. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45924) Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec
[ https://issues.apache.org/jira/browse/SPARK-45924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif resolved SPARK-45924. -- Resolution: Not A Bug > Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not > equivalent with SubqueryBroadcastExec > > > Key: SPARK-45924 > URL: https://issues.apache.org/jira/browse/SPARK-45924 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > while writing bug test for > [SPARK-45866|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-45866], > found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in > the sense that buildPlan : LogicalPlan is not canonicalized which causes > batchscans to differ when reuse of exchange check happens in AQE. > Moreover the equivalence of SubqueryAdaptiveBroadcastExec and > SubqueryBroadcastExec is not there which also aggravates the re-use of > exchange in aqe broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Resolved] (SPARK-45925) SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec causing re-use of exchange not happening in AQE
[ https://issues.apache.org/jira/browse/SPARK-45925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif resolved SPARK-45925. -- Resolution: Not A Problem > SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec > causing re-use of exchange not happening in AQE > -- > > Key: SPARK-45925 > URL: https://issues.apache.org/jira/browse/SPARK-45925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > A created stage may contain SubqueryAdaptiveBroadcastExec while incominng > exchange may contain SubqueryBroadcastExec and though they are equivalent , > the match does not happen because equals/hashCode do not match , resulting in > non re-use of exchange. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45866) Reuse of exchange in AQE does not happen when run time filters are pushed down to the underlying Scan ( like iceberg )
[ https://issues.apache.org/jira/browse/SPARK-45866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17786155#comment-17786155 ] Asif commented on SPARK-45866: -- Now that the other PRs on which this ticket itself is dependent are created, I will open a PR with bug test tomorrow. Ofcourse the bugtest itself will fail till the master contains all the dependent PRs > Reuse of exchange in AQE does not happen when run time filters are pushed > down to the underlying Scan ( like iceberg ) > -- > > Key: SPARK-45866 > URL: https://issues.apache.org/jira/browse/SPARK-45866 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > In certain types of queries for eg TPCDS Query 14b, the reuse of exchange > does not happen in AQE , resulting in perf degradation. > The spark TPCDS tests are unable to catch the problem, because the > InMemoryScan used for testing do not implement the equals & hashCode > correctly , in the sense , that they do take into account the pushed down run > time filters. > In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the > equality check , apart from other things, also involves Runtime Filters > pushed ( which is correct). > In spark the issue is this: > For a given stage being materialized, just before materialization starts, > the run time filters are confined to the BatchScanExec level. > Only when the actual RDD corresponding to the BatchScanExec, is being > evaluated, do the runtime filters get pushed to the underlying Scan. > Now if a new stage is created and it checks in the stageCache using its > canonicalized plan to see if a stage can be reused, it fails to find the > r-usable stage even if the stage exists, because the canonicalized spark > plan present in the stage cache, has now the run time filters pushed to the > Scan , so the incoming canonicalized spark plan does not match the key as > their underlying scans differ . that is incoming spark plan's scan does not > have runtime filters , while the canonicalized spark plan present as key in > the stage cache has the scan with runtime filters pushed. > The fix as I have worked is to provide, two methods in the > SupportsRuntimeV2Filtering interface , > default boolean equalToIgnoreRuntimeFilters(Scan other) { > return this.equals(other); > } > default int hashCodeIgnoreRuntimeFilters() { > return this.hashCode(); > } > In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then > instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters > And the underlying Scan implementations should provide equality which > excludes run time filters. > Similarly the hashCode of BatchScanExec, should use > scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode). > Will be creating a PR with bug test for review. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45925) SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec causing re-use of exchange not happening in AQE
[ https://issues.apache.org/jira/browse/SPARK-45925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45925: - Labels: pull-request-available (was: ) > SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec > causing re-use of exchange not happening in AQE > -- > > Key: SPARK-45925 > URL: https://issues.apache.org/jira/browse/SPARK-45925 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > A created stage may contain SubqueryAdaptiveBroadcastExec while incominng > exchange may contain SubqueryBroadcastExec and though they are equivalent , > the match does not happen because equals/hashCode do not match , resulting in > non re-use of exchange. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45924) Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec
[ https://issues.apache.org/jira/browse/SPARK-45924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45924: - Labels: pull-request-available (was: ) > Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not > equivalent with SubqueryBroadcastExec > > > Key: SPARK-45924 > URL: https://issues.apache.org/jira/browse/SPARK-45924 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > while writing bug test for > [SPARK-45866|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-45866], > found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in > the sense that buildPlan : LogicalPlan is not canonicalized which causes > batchscans to differ when reuse of exchange check happens in AQE. > Moreover the equivalence of SubqueryAdaptiveBroadcastExec and > SubqueryBroadcastExec is not there which also aggravates the re-use of > exchange in aqe broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45926) The InMemoryV2FilterBatchScan and InMemoryBatchScan are not implementing equals and hashCode correctly
Asif created SPARK-45926: Summary: The InMemoryV2FilterBatchScan and InMemoryBatchScan are not implementing equals and hashCode correctly Key: SPARK-45926 URL: https://issues.apache.org/jira/browse/SPARK-45926 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif These InMemoryV2FilterBatchScan and InMemoryBatchScan test classes are not implementing hashCode and equals correctly as they are not taking into account the pushed runtime filters. As a result they are unable to expose the TPCDS test issues which can show whether the reuse of exchange is happening correctly or not. If these classes implement equals and hashCode taking into account the pushed runtime filters, we would see that TPCDS Q14b which should ideally be reusing the exchange containing Union , is not happening due to multiple bugs which surface in AQE. Actual V2 DataSources like iceberg correctly implement equals and hashCode taking into account pushed runtime filters , which also expose the same issue of reuse of exchnage not happening -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45925) SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec causing re-use of exchange not happening in AQE
Asif created SPARK-45925: Summary: SubqueryBroadcastExec is not equivalent with SubqueryAdaptiveBroadcastExec causing re-use of exchange not happening in AQE Key: SPARK-45925 URL: https://issues.apache.org/jira/browse/SPARK-45925 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif A created stage may contain SubqueryAdaptiveBroadcastExec while incominng exchange may contain SubqueryBroadcastExec and though they are equivalent , the match does not happen because equals/hashCode do not match , resulting in non re-use of exchange. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] (SPARK-45658) Canonicalization of DynamicPruningSubquery is broken
[ https://issues.apache.org/jira/browse/SPARK-45658 ] Asif deleted comment on SPARK-45658: -- was (Author: ashahid7): I also think that during canonicalization of DynamicPruningSubquery, the pruning key's canonicalization should be done on the basis of the enclosing Plan which contains the DynamicPruningSubquery Expression > Canonicalization of DynamicPruningSubquery is broken > > > Key: SPARK-45658 > URL: https://issues.apache.org/jira/browse/SPARK-45658 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > The canonicalization of (buildKeys: Seq[Expression]) in the class > DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by > calling > buildKeys.map(_.canonicalized) > The above would result in incorrect canonicalization as it would not be > normalizing the exprIds relative to buildQuery output > The fix is to use the buildQuery : LogicalPlan's output to normalize the > buildKeys expression > as given below, using the standard approach. > buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), > Will be filing a PR and bug test for the same. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45924) Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec
[ https://issues.apache.org/jira/browse/SPARK-45924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45924: - Description: while writing bug test for [SPARK-45866|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-45866], found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in the sense that buildPlan : LogicalPlan is not canonicalized which causes batchscans to differ when reuse of exchange check happens in AQE. Moreover the equivalence of SubqueryAdaptiveBroadcastExec and SubqueryBroadcastExec is not there which also aggravates the re-use of exchange in aqe broken. was: while writing bug test for [SPARK-45866|http://example.com], found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in the sense that buildPlan : LogicalPlan is not canonicalized which causes batchscans to differ when reuse of exchange check happens in AQE. Moreover the equivalence of SubqueryAdaptiveBroadcastExec and SubqueryBroadcastExec is not there which also aggravates the re-use of exchange in aqe broken. > Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not > equivalent with SubqueryBroadcastExec > > > Key: SPARK-45924 > URL: https://issues.apache.org/jira/browse/SPARK-45924 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > > while writing bug test for > [SPARK-45866|https://issues.apache.org/jira/projects/SPARK/issues/SPARK-45866], > found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in > the sense that buildPlan : LogicalPlan is not canonicalized which causes > batchscans to differ when reuse of exchange check happens in AQE. > Moreover the equivalence of SubqueryAdaptiveBroadcastExec and > SubqueryBroadcastExec is not there which also aggravates the re-use of > exchange in aqe broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45924) Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec
Asif created SPARK-45924: Summary: Canonicalization of SubqueryAdaptiveBroadcastExec is broken and is not equivalent with SubqueryBroadcastExec Key: SPARK-45924 URL: https://issues.apache.org/jira/browse/SPARK-45924 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif while writing bug test for [SPARK-45866|http://example.com], found that canonicalization of SubqueryAdaptiveBroadcastExec is broken in the sense that buildPlan : LogicalPlan is not canonicalized which causes batchscans to differ when reuse of exchange check happens in AQE. Moreover the equivalence of SubqueryAdaptiveBroadcastExec and SubqueryBroadcastExec is not there which also aggravates the re-use of exchange in aqe broken. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45373: - Shepherd: (was: Peter Toth) > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. > Opened following PR for ticket: > [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-33152: - Affects Version/s: 3.5.0 (was: 2.4.0) (was: 3.0.1) (was: 3.1.2) > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code incorrectly generated a > EqualsNullSafeConstraint instead of EqualTo constraint, when using the > existing Constraints code. With these changes, the test correctly creates an > EqualTo constraint, resulting in an inferred IsNotNull constraint > # It does away with the current combinatorial logic of evaluation all the > constraints can cause compilation to run into hours or cause OOM. The number > of constraints stored is exactly the same as the number of filters encountered > h2. Q2. What problem is this proposal NOT designed to solve? > It mainly focuses on compile time performance, but in some cases can benefit > run time characteristics too, like inferring IsNotNull filter or pushing down > compound predicates on the join, which currently may get missed/ does not > happen , respectively, by the present code. > h2. Q3. How is it done today, and what are the limits of current practice? > Current ConstraintsPropagation code, pessimistically tries to generates all > the possible combinations of constraints , based on the aliases ( even then > it may miss a lot of combinations if the expression is a complex expression > involving same attribute repeated multiple times within the expression and > there are many aliases to that column). There are query plans in our > production env, which can result in intermediate number of constraints going > into hundreds of thousands, causing OOM or taking time running into hours. > Also there are cases where it incorrectly generates an EqualNullSafe > constraint instead of EqualTo constraint , thus missing a possible IsNull > constraint on column. > Also it only pushes single column predicate on the other side of the join. > The constraints generated , in some cases, are missing the required ones, and > the plan
[jira] [Updated] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45373: - Affects Version/s: 3.5.0 (was: 4.0.0) > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. > Opened following PR for ticket: > [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Affects Version/s: 3.5.0 (was: 3.5.1) > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , the PR on spark side is > [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non > partition table TPCDS run on laptop with TPCDS data size of ( scale factor
[jira] [Created] (SPARK-45866) Reuse of exchange in AQE does not happen when run time filters are pushed down to the underlying Scan ( like iceberg )
Asif created SPARK-45866: Summary: Reuse of exchange in AQE does not happen when run time filters are pushed down to the underlying Scan ( like iceberg ) Key: SPARK-45866 URL: https://issues.apache.org/jira/browse/SPARK-45866 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.1 Reporter: Asif In certain types of queries for eg TPCDS Query 14b, the reuse of exchange does not happen in AQE , resulting in perf degradation. The spark TPCDS tests are unable to catch the problem, because the InMemoryScan used for testing do not implement the equals & hashCode correctly , in the sense , that they do take into account the pushed down run time filters. In concrete Scan implementations, for eg iceberg's SparkBatchQueryScan , the equality check , apart from other things, also involves Runtime Filters pushed ( which is correct). In spark the issue is this: For a given stage being materialized, just before materialization starts, the run time filters are confined to the BatchScanExec level. Only when the actual RDD corresponding to the BatchScanExec, is being evaluated, do the runtime filters get pushed to the underlying Scan. Now if a new stage is created and it checks in the stageCache using its canonicalized plan to see if a stage can be reused, it fails to find the r-usable stage even if the stage exists, because the canonicalized spark plan present in the stage cache, has now the run time filters pushed to the Scan , so the incoming canonicalized spark plan does not match the key as their underlying scans differ . that is incoming spark plan's scan does not have runtime filters , while the canonicalized spark plan present as key in the stage cache has the scan with runtime filters pushed. The fix as I have worked is to provide, two methods in the SupportsRuntimeV2Filtering interface , default boolean equalToIgnoreRuntimeFilters(Scan other) { return this.equals(other); } default int hashCodeIgnoreRuntimeFilters() { return this.hashCode(); } In the BatchScanExec, if the scan implements SupportsRuntimeV2Filtering, then instead of batch.equals, it should call scan.equalToIgnoreRuntimeFilters And the underlying Scan implementations should provide equality which excludes run time filters. Similarly the hashCode of BatchScanExec, should use scan.hashCodeIgnoreRuntimeFilters instead of ( batch.hashCode). Will be creating a PR with bug test for review. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45658) Canonicalization of DynamicPruningSubquery is broken
[ https://issues.apache.org/jira/browse/SPARK-45658?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784567#comment-17784567 ] Asif commented on SPARK-45658: -- I also think that during canonicalization of DynamicPruningSubquery, the pruning key's canonicalization should be done on the basis of the enclosing Plan which contains the DynamicPruningSubquery Expression > Canonicalization of DynamicPruningSubquery is broken > > > Key: SPARK-45658 > URL: https://issues.apache.org/jira/browse/SPARK-45658 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > The canonicalization of (buildKeys: Seq[Expression]) in the class > DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by > calling > buildKeys.map(_.canonicalized) > The above would result in incorrect canonicalization as it would not be > normalizing the exprIds relative to buildQuery output > The fix is to use the buildQuery : LogicalPlan's output to normalize the > buildKeys expression > as given below, using the standard approach. > buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), > Will be filing a PR and bug test for the same. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17784282#comment-17784282 ] Asif commented on SPARK-44662: -- The changes for iceberg which support broadcast-var-pushdown are present in the git repo: [iceberg-repo|https://github.com/ahshahid/iceberg.git] branch : broadcastvar-push. The changes done in the iceberg branch are compatible with latest apache/spark master ( identified as 3.5 to iceberg) and tested and compiled using scala 2.13. To get the iceberg-spark-run-time jar for use: First locally install the spark jars using the PR of spark mentioned below. (./build/mvn clean install -Phive -Phive-thriftserver -DskipTests) Then use the iceberg branch broadcastvar-push to create the iceberg spark runtime jar such that it uses the locally installed spark as dependency. In case you are interested in evaluating performance, pls let me know. > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target >
[jira] [Commented] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17782927#comment-17782927 ] Asif commented on SPARK-44662: -- The majority of file changes are due to additional tpcds tests for iceberg. These will not be included as such in final PR > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , the PR on spark side is >
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Attachment: perf results broadcast var pushdown - Partitioned TPCDS.pdf > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > Attachments: perf results broadcast var pushdown - Partitioned > TPCDS.pdf > > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , the PR on spark side is > [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non > partition table TPCDS run on laptop with TPCDS data size of ( scale
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , the PR on spark side is [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins,
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Affects Version/s: 3.5.1 (was: 3.3.3) > SPIP: Improving performance of BroadcastHashJoin queries with stream side > join key on non partition columns > --- > > Key: SPARK-44662 > URL: https://issues.apache.org/jira/browse/SPARK-44662 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Major > Labels: pull-request-available > > h2. *Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon.* > On the lines of DPP which helps DataSourceV2 relations when the joining key > is a partition column, the same concept can be extended over to the case > where joining key is not a partition column. > The Keys of BroadcastHashJoin are already available before actual evaluation > of the stream iterator. These keys can be pushed down to the DataSource as a > SortedSet. > For non partition columns, the DataSources like iceberg have max/min stats on > column available at manifest level, and for formats like parquet , they have > max/min stats at various storage level. The passed SortedSet can be used to > prune using ranges at both driver level ( manifests files) as well as > executor level ( while actually going through chunks , row groups etc at > parquet level) > If the data is stored as Columnar Batch format , then it would not be > possible to filter out individual row at DataSource level, even though we > have keys. > But at the scan level, ( ColumnToRowExec) it is still possible to filter out > as many rows as possible , if the query involves nested joins. Thus reducing > the number of rows to join at the higher join levels. > Will be adding more details.. > h2. *Q2. What problem is this proposal NOT designed to solve?* > This can only help in BroadcastHashJoin's performance if the join is Inner or > Left Semi. > This will also not work if there are nodes like Expand, Generator , Aggregate > (without group by on keys not part of joining column etc) below the > BroadcastHashJoin node being targeted. > h2. *Q3. How is it done today, and what are the limits of current practice?* > Currently this sort of pruning at DataSource level is being done using DPP > (Dynamic Partition Pruning ) and IFF one of the join key column is a > Partitioning column ( so that cost of DPP query is justified and way less > than amount of data it will be filtering by skipping partitions). > The limitation is that DPP type approach is not implemented ( intentionally I > believe), if the join column is a non partition column ( because of cost of > "DPP type" query would most likely be way high as compared to any possible > pruning ( especially if the column is not stored in a sorted manner). > h2. *Q4. What is new in your approach and why do you think it will be > successful?* > 1) This allows pruning on non partition column based joins. > 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP > type" query. > 3) The Data can be used by DataSource to prune at driver (possibly) and also > at executor level ( as in case of parquet which has max/min at various > structure levels) > 4) The big benefit should be seen in multilevel nested join queries. In the > current code base, if I am correct, only one join's pruning filter would get > pushed at scan level. Since it is on partition key may be that is sufficient. > But if it is a nested Join query , and may be involving different columns on > streaming side for join, each such filter push could do significant pruning. > This requires some handling in case of AQE, as the stream side iterator ( & > hence stage evaluation needs to be delayed, till all the available join > filters in the nested tree are pushed at their respective target > BatchScanExec). > h4. *Single Row Filteration* > 5) In case of nested broadcasted joins, if the datasource is column vector > oriented , then what spark would get is a ColumnarBatch. But because scans > have Filters from multiple joins, they can be retrieved and can be applied in > code generated at ColumnToRowExec level, using a new "containsKey" method on > HashedRelation. Thus only those rows which satisfy all the > BroadcastedHashJoins ( whose keys have been pushed) , will be used for join > evaluation. > The code is already there , the PR on spark side is > [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non > partition table TPCDS run on laptop with TPCDS data size of ( scale factor > 4), I am seeing 15% gain. > For partition table TPCDS, there is improvement in 4 - 5
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , the PR on spark side is [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins,
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , the PR on spark side is [spark-broadcast-var|https://github.com/apache/spark/pull/43373]. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins,
[jira] [Commented] (SPARK-36786) SPIP: Improving the compile time performance, by improving a couple of rules, from 24 hrs to under 8 minutes
[ https://issues.apache.org/jira/browse/SPARK-36786?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17781976#comment-17781976 ] Asif commented on SPARK-36786: -- I had put this on back burner as my changes were on 3.2, so I have to do a merge . on latest. Though whatever optimizations I did on 3.2 are still applicable as the drawback still exist. But chnages are going to be a a little extensive. If there is interest on it I can pick up , after some days as right now occupied with another spip which proposes chnages for improving perf of broadcast hash joins on non partition column joins. > SPIP: Improving the compile time performance, by improving a couple of > rules, from 24 hrs to under 8 minutes > - > > Key: SPARK-36786 > URL: https://issues.apache.org/jira/browse/SPARK-36786 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.1, 3.1.2 >Reporter: Asif >Priority: Major > Labels: SPIP > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > The aim is to improve the compile time performance of query which in > WorkDay's use case takes > 24 hrs ( & eventually fails) , to < 8 min. > To explain the problem, I will provide the context. > The query plan in our production system, is huge, with nested *case when* > expressions ( level of nesting could be > 8) , where each *case when* can > have branches sometimes > 1000. > The plan could look like > {quote}Project1 > | > Filter 1 > | > Project2 > | > Filter2 > | > Project3 > | > Filter3 > | > Join > {quote} > Now the optimizer has a Batch of Rules , intended to run at max 100 times. > *Also note that the, the batch will continue to run till one of the condition > is satisfied* > *i.e either numIter == 100 || inputPlan == outputPlan (idempotency is > achieved)* > One of the early Rule is *PushDownPredicateRule.* > **Followed by **CollapseProject**. > > The first issue is *PushDownPredicate* rule. > It picks one filter at a time & pushes it at lowest level ( I understand > that in 3.1 it pushes through join, while in 2.4 it stops at Join) , but > either case it picks 1 filter at time starting from top, in each iteration. > *The above comment is no longer true in 3.1 release as it now combines > filters. so it does push now all the encountered filters in a single pass. > But it still materializes the filter on each push by realiasing.* > So if there are say 50 projects interspersed with Filters , the idempotency > is guaranteedly not going to get achieved till around 49 iterations. > Moreover, CollapseProject will also be modifying tree on each iteration as a > filter will get removed within Project. > Moreover, on each movement of filter through project tree, the filter is > re-aliased using transformUp rule. transformUp is very expensive compared to > transformDown. As the filter keeps getting pushed down , its size increases. > To optimize this rule , 2 things are needed > # Instead of pushing one filter at a time, collect all the filters as we > traverse the tree in that iteration itself. > # Do not re-alias the filters on each push. Collect the sequence of projects > it has passed through, and when the filters have reached their resting > place, do the re-alias by processing the projects collected in down to up > manner. > This will result in achieving idempotency in a couple of iterations. > *How reducing the number of iterations help in performance* > There are many rules like *NullPropagation, OptimizeIn, SimplifyConditionals > ( ... there are around 6 more such rules)* which traverse the tree using > transformUp, and they run unnecessarily in each iteration , even when the > expressions in an operator have not changed since the previous runs. > *I have a different proposal which I will share later, as to how to avoid the > above rules from running unnecessarily, if it can be guaranteed that the > expression is not going to mutate in the operator.* > The cause of our huge compilation time has been identified as the above. > > h2. Q2. What problem is this proposal NOT designed to solve? > It is not going to change any runtime profile. > h2. Q3. How is it done today, and what are the limits of current practice? > Like mentioned above , currently PushDownPredicate pushes one filter at a > time & at each Project , it materialized the re-aliased filter. This > results in large number of iterations to achieve idempotency as well as > immediate materialization of Filter after each Project pass,, results in > unnecessary tree traversals of filter expression that too using transformUp. > and the expression tree of filter is bound to keep
[jira] [Updated] (SPARK-45658) Canonicalization of DynamicPruningSubquery is broken
[ https://issues.apache.org/jira/browse/SPARK-45658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45658: - Description: The canonicalization of (buildKeys: Seq[Expression]) in the class DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by calling buildKeys.map(_.canonicalized) The above would result in incorrect canonicalization as it would not be normalizing the exprIds relative to buildQuery output The fix is to use the buildQuery : LogicalPlan's output to normalize the buildKeys expression as given below, using the standard approach. buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), Will be filing a PR and bug test for the same. was: The canonicalization of (buildKeys: Seq[Expression]) in the class DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by calling buildKeys.map(_.canonicalized) The above would result in incorrect canonicalization as it would not be normalizing the exprIds The fix is to use the buildQuery : LogicalPlan's output to normalize the buildKeys expression as given below, using the standard approach. buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), Will be filing a PR and bug test for the same. > Canonicalization of DynamicPruningSubquery is broken > > > Key: SPARK-45658 > URL: https://issues.apache.org/jira/browse/SPARK-45658 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0, 3.5.1 >Reporter: Asif >Priority: Major > > The canonicalization of (buildKeys: Seq[Expression]) in the class > DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by > calling > buildKeys.map(_.canonicalized) > The above would result in incorrect canonicalization as it would not be > normalizing the exprIds relative to buildQuery output > The fix is to use the buildQuery : LogicalPlan's output to normalize the > buildKeys expression > as given below, using the standard approach. > buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), > Will be filing a PR and bug test for the same. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45658) Canonicalization of DynamicPruningSubquery is broken
[ https://issues.apache.org/jira/browse/SPARK-45658?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45658: - Priority: Major (was: Critical) > Canonicalization of DynamicPruningSubquery is broken > > > Key: SPARK-45658 > URL: https://issues.apache.org/jira/browse/SPARK-45658 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.5.0, 3.5.1 >Reporter: Asif >Priority: Major > > The canonicalization of (buildKeys: Seq[Expression]) in the class > DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by > calling > buildKeys.map(_.canonicalized) > The above would result in incorrect canonicalization as it would not be > normalizing the exprIds > The fix is to use the buildQuery : LogicalPlan's output to normalize the > buildKeys expression > as given below, using the standard approach. > buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), > Will be filing a PR and bug test for the same. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45658) Canonicalization of DynamicPruningSubquery is broken
Asif created SPARK-45658: Summary: Canonicalization of DynamicPruningSubquery is broken Key: SPARK-45658 URL: https://issues.apache.org/jira/browse/SPARK-45658 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.5.0, 3.5.1 Reporter: Asif The canonicalization of (buildKeys: Seq[Expression]) in the class DynamicPruningSubquery is broken, as the buildKeys are canonicalized just by calling buildKeys.map(_.canonicalized) The above would result in incorrect canonicalization as it would not be normalizing the exprIds The fix is to use the buildQuery : LogicalPlan's output to normalize the buildKeys expression as given below, using the standard approach. buildKeys.map(QueryPlan.normalizeExpressions(_, buildQuery.output)), Will be filing a PR and bug test for the same. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45373: - Affects Version/s: 4.0.0 (was: 3.5.1) > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 4.0.0 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. > Opened following PR for ticket: > [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-45373: - Description: In the rule PruneFileSourcePartitions where the CatalogFileIndex gets converted to InMemoryFileIndex, the HMS calls can get very expensive if : 1) The translated filter string for push down to HMS layer becomes empty , resulting in fetching of all partitions and same table is referenced multiple times in the query. 2) Or just in case same table is referenced multiple times in the query with different partition filters. In such cases current code would result in multiple calls to HMS layer. This can be avoided by grouping the tables based on CatalogFileIndex and passing a common minimum filter ( filter1 || filter2) and getting a base PrunedInmemoryFileIndex which can become a basis for each of the specific table. Opened following PR for ticket: [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] was: In the rule PruneFileSourcePartitions where the CatalogFileIndex gets converted to InMemoryFileIndex, the HMS calls can get very expensive if : 1) The translated filter string for push down to HMS layer becomes empty , resulting in fetching of all partitions and same table is referenced multiple times in the query. 2) Or just in case same table is referenced multiple times in the query with different partition filters. In such cases current code would result in multiple calls to HMS layer. This can be avoided by grouping the tables based on CatalogFileIndex and passing a common minimum filter ( filter1 || filter2) and getting a base PrunedInmemoryFileIndex which can become a basis for each of the specific table. > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. > Opened following PR for ticket: > [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373 ] Asif deleted comment on SPARK-45373: -- was (Author: ashahid7): Will be generating a PR for this. > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > Labels: pull-request-available > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. > Opened following PR for ticket: > [SPARK-45373-PR|https://github.com/apache/spark/pull/43183] -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
[ https://issues.apache.org/jira/browse/SPARK-45373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17770220#comment-17770220 ] Asif commented on SPARK-45373: -- Will be generating a PR for this. > Minimizing calls to HiveMetaStore layer for getting partitions, when tables > are repeated > - > > Key: SPARK-45373 > URL: https://issues.apache.org/jira/browse/SPARK-45373 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.5.1 >Reporter: Asif >Priority: Minor > Fix For: 3.5.1 > > > In the rule PruneFileSourcePartitions where the CatalogFileIndex gets > converted to InMemoryFileIndex, the HMS calls can get very expensive if : > 1) The translated filter string for push down to HMS layer becomes empty , > resulting in fetching of all partitions and same table is referenced multiple > times in the query. > 2) Or just in case same table is referenced multiple times in the query with > different partition filters. > In such cases current code would result in multiple calls to HMS layer. > This can be avoided by grouping the tables based on CatalogFileIndex and > passing a common minimum filter ( filter1 || filter2) and getting a base > PrunedInmemoryFileIndex which can become a basis for each of the specific > table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-45373) Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated
Asif created SPARK-45373: Summary: Minimizing calls to HiveMetaStore layer for getting partitions, when tables are repeated Key: SPARK-45373 URL: https://issues.apache.org/jira/browse/SPARK-45373 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.5.1 Reporter: Asif Fix For: 3.5.1 In the rule PruneFileSourcePartitions where the CatalogFileIndex gets converted to InMemoryFileIndex, the HMS calls can get very expensive if : 1) The translated filter string for push down to HMS layer becomes empty , resulting in fetching of all partitions and same table is referenced multiple times in the query. 2) Or just in case same table is referenced multiple times in the query with different partition filters. In such cases current code would result in multiple calls to HMS layer. This can be avoided by grouping the tables based on CatalogFileIndex and passing a common minimum filter ( filter1 || filter2) and getting a base PrunedInmemoryFileIndex which can become a basis for each of the specific table. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit, even with Default
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. *Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful . Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How long
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:singleRowFilter}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:srf}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#srf] approach would still result in perf benefit. h2.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). h4. {anchor:singleRowfilter}*Single Row Filteration* 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowfilter} 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. Q7. How
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, approach would still result in perf benefit. Q7. How long will it take?
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful. Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for {*}iceberg{*}. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7. How long will it take?* The code is
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowfilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowfilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor: singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7.
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {:singleRowFilter} 5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How
[jira] [Updated] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
[ https://issues.apache.org/jira/browse/SPARK-44662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-44662: - Description: h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. *Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit fully, the pushed filters utilization needs to be implemented on the DataSource side too. Have already done it for *iceberg*. But I believe atleast in case of Nested Broadcast Hash Joins, [#singleRowFilter] approach would still result in perf benefit. h2. *Q7. How
[jira] [Created] (SPARK-44662) SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns
Asif created SPARK-44662: Summary: SPIP: Improving performance of BroadcastHashJoin queries with stream side join key on non partition columns Key: SPARK-44662 URL: https://issues.apache.org/jira/browse/SPARK-44662 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.3 Reporter: Asif Fix For: 3.3.3 h2. *Q1. What are you trying to do? Articulate your objectives using absolutely no jargon.* On the lines of DPP which helps DataSourceV2 relations when the joining key is a partition column, the same concept can be extended over to the case where joining key is not a partition column. The Keys of BroadcastHashJoin are already available before actual evaluation of the stream iterator. These keys can be pushed down to the DataSource as a SortedSet. For non partition columns, the DataSources like iceberg have max/min stats on column available at manifest level, and for formats like parquet , they have max/min stats at various storage level. The passed SortedSet can be used to prune using ranges at both driver level ( manifests files) as well as executor level ( while actually going through chunks , row groups etc at parquet level) If the data is stored as Columnar Batch format , then it would not be possible to filter out individual row at DataSource level, even though we have keys. But at the scan level, ( ColumnToRowExec) it is still possible to filter out as many rows as possible , if the query involves nested joins. Thus reducing the number of rows to join at the higher join levels. Will be adding more details.. h2. *Q2. What problem is this proposal NOT designed to solve?* This can only help in BroadcastHashJoin's performance if the join is Inner or Left Semi. This will also not work if there are nodes like Expand, Generator , Aggregate (without group by on keys not part of joining column etc) below the BroadcastHashJoin node being targeted. h2. *Q3. How is it done today, and what are the limits of current practice?* Currently this sort of pruning at DataSource level is being done using DPP (Dynamic Partition Pruning ) and IFF one of the join key column is a Partitioning column ( so that cost of DPP query is justified and way less than amount of data it will be filtering by skipping partitions). The limitation is that DPP type approach is not implemented ( intentionally I believe), if the join column is a non partition column ( because of cost of "DPP type" query would most likely be way high as compared to any possible pruning ( especially if the column is not stored in a sorted manner). h2. *Q4. What is new in your approach and why do you think it will be successful?* 1) This allows pruning on non partition column based joins. 2) Because it piggy backs on Broadcasted Keys, there is no extra cost of "DPP type" query. 3) The Data can be used by DataSource to prune at driver (possibly) and also at executor level ( as in case of parquet which has max/min at various structure levels) 4) The big benefit should be seen in multilevel nested join queries. In the current code base, if I am correct, only one join's pruning filter would get pushed at scan level. Since it is on partition key may be that is sufficient. But if it is a nested Join query , and may be involving different columns on streaming side for join, each such filter push could do significant pruning. This requires some handling in case of AQE, as the stream side iterator ( & hence stage evaluation needs to be delayed, till all the available join filters in the nested tree are pushed at their respective target BatchScanExec). {anchor:singleRowFilter}5) In case of nested broadcasted joins, if the datasource is column vector oriented , then what spark would get is a ColumnarBatch. But because scans have Filters from multiple joins, they can be retrieved and can be applied in code generated at ColumnToRowExec level, using a new "containsKey" method on HashedRelation. Thus only those rows which satisfy all the BroadcastedHashJoins ( whose keys have been pushed) , will be used for join evaluation. The code is already there , will be opening a PR. For non partition table TPCDS run on laptop with TPCDS data size of ( scale factor 4), I am seeing 15% gain. For partition table TPCDS, there is improvement in 4 - 5 queries to the tune of 10% to 37%. h2. *Q5. Who cares? If you are successful, what difference will it make?* If use cases involve multiple joins especially when the join columns are non partitioned, and performance is a criteria, this PR *might* help. h2. Q6. What are the risks?* Well the changes are extensive. review will be painful ( if it happens). Though code is being tested continuously and adding more tests , with big change, some possibility of bugs is there. But as of now, I think the code is robust. To get the Perf benefit
[jira] [Resolved] (SPARK-43112) Spark may use a column other than the actual specified partitioning column for partitioning, for Hive format tables
[ https://issues.apache.org/jira/browse/SPARK-43112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif resolved SPARK-43112. -- Resolution: Not A Bug > Spark may use a column other than the actual specified partitioning column > for partitioning, for Hive format tables > > > Key: SPARK-43112 > URL: https://issues.apache.org/jira/browse/SPARK-43112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Major > > The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its > output method implemented as > // The partition column should always appear after data columns. > override def output: Seq[AttributeReference] = dataCols ++ partitionCols > But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect > that the output from HiveTableRelation is in the order in which the columns > are actually defined in the DDL. > As a result, multiple mismatch scenarios can happen like: > 1) data type casting exception being thrown , even though the data frame > being inserted has schema which is identical to what is used for creating ddl. > OR > 2) Wrong column being used for partitioning , if the datatypes are same or > cast-able, like date type and long > will be creating a PR with the bug test -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-43112) Spark may use a column other than the actual specified partitioning column for partitioning, for Hive format tables
[ https://issues.apache.org/jira/browse/SPARK-43112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17711607#comment-17711607 ] Asif commented on SPARK-43112: -- Open a WIP PR [SPARK-43112|https://github.com/apache/spark/pull/40765/] which has bug tests as of now > Spark may use a column other than the actual specified partitioning column > for partitioning, for Hive format tables > > > Key: SPARK-43112 > URL: https://issues.apache.org/jira/browse/SPARK-43112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Critical > > The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its > output method implemented as > // The partition column should always appear after data columns. > override def output: Seq[AttributeReference] = dataCols ++ partitionCols > But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect > that the output from HiveTableRelation is in the order in which the columns > are actually defined in the DDL. > As a result, multiple mismatch scenarios can happen like: > 1) data type casting exception being thrown , even though the data frame > being inserted has schema which is identical to what is used for creating ddl. > OR > 2) Wrong column being used for partitioning , if the datatypes are same or > cast-able, like date type and long > will be creating a PR with the bug test -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-43112) Spark may use a column other than the actual specified partitioning column for partitioning, for Hive format tables
[ https://issues.apache.org/jira/browse/SPARK-43112?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-43112: - Description: The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its output method implemented as // The partition column should always appear after data columns. override def output: Seq[AttributeReference] = dataCols ++ partitionCols But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect that the output from HiveTableRelation is in the order in which the columns are actually defined in the DDL. As a result, multiple mismatch scenarios can happen like: 1) data type casting exception being thrown , even though the data frame being inserted has schema which is identical to what is used for creating ddl. OR 2) Wrong column being used for partitioning , if the datatypes are same or cast-able, like date type and long will be creating a PR with the bug test was: The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its output method implemented as // The partition column should always appear after data columns. override def output: Seq[AttributeReference] = dataCols ++ partitionCols But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect that the out from HiveTableRelation is in the order in which the columns are actually defined in the DDL. As a result, multiple mistmatch scenarios can happen like: 1) data type casting exception being thrown , even though the data frame being inserted has schema which is identical to what is used for creating ddl. OR 2) Wrong column being used for partitioning , if the datatypes are same or castable, like datetype and long will be creating a PR with the bug test > Spark may use a column other than the actual specified partitioning column > for partitioning, for Hive format tables > > > Key: SPARK-43112 > URL: https://issues.apache.org/jira/browse/SPARK-43112 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Critical > > The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its > output method implemented as > // The partition column should always appear after data columns. > override def output: Seq[AttributeReference] = dataCols ++ partitionCols > But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect > that the output from HiveTableRelation is in the order in which the columns > are actually defined in the DDL. > As a result, multiple mismatch scenarios can happen like: > 1) data type casting exception being thrown , even though the data frame > being inserted has schema which is identical to what is used for creating ddl. > OR > 2) Wrong column being used for partitioning , if the datatypes are same or > cast-able, like date type and long > will be creating a PR with the bug test -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-43112) Spark may use a column other than the actual specified partitioning column for partitioning, for Hive format tables
Asif created SPARK-43112: Summary: Spark may use a column other than the actual specified partitioning column for partitioning, for Hive format tables Key: SPARK-43112 URL: https://issues.apache.org/jira/browse/SPARK-43112 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 3.3.1 Reporter: Asif The class org.apache.spark.sql.catalyst.catalog.HiveTableRelation has its output method implemented as // The partition column should always appear after data columns. override def output: Seq[AttributeReference] = dataCols ++ partitionCols But the DataWriting commands of spark like InsertIntoHiveDirCommand, expect that the out from HiveTableRelation is in the order in which the columns are actually defined in the DDL. As a result, multiple mistmatch scenarios can happen like: 1) data type casting exception being thrown , even though the data frame being inserted has schema which is identical to what is used for creating ddl. OR 2) Wrong column being used for partitioning , if the datatypes are same or castable, like datetype and long will be creating a PR with the bug test -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-41141) avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
[ https://issues.apache.org/jira/browse/SPARK-41141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17635989#comment-17635989 ] Asif commented on SPARK-41141: -- Opened the following PR [SPARK-41141-PR|https://github.com/apache/spark/pull/38714/files] > avoid introducing a new aggregate expression in the analysis phase when > subquery is referencing it > -- > > Key: SPARK-41141 > URL: https://issues.apache.org/jira/browse/SPARK-41141 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Minor > Labels: spark-sql > > Currently the analyzer phase rules on subquery referencing the aggregate > expression in outer query, avoids introducing a new aggregate only for a > single level aggregate function. It introduces new aggregate expression for > nested aggregate functions. > It is possible to avoid adding this extra aggregate expression easily, > atleast if the outer projection involving aggregate function is exactly same > as the one that is used in subquery, or if the outer query's projection > involving aggregate function is a subtree of the subquery's expression. > > Thus consider the following 2 cases: > 1) select cos (sum(a)) , b from t1 group by b having exists (select x from > t2 where y = cos(sum(a)) ) > 2) select sum(a) , b from t1 group by b having exists (select x from t2 > where y = cos(sum(a)) ) > > In both the above cases, there is no need for adding extra aggregate > expression. > > I am also investigating if its possible to avoid if the case is > > 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from > t2 where y = sum(a) ) > > This Jira also is needed for another issue where subquery datasource v2 is > projecting columns which are not needed. ( no Jira filed yet for that, will > do that..) > > Will be opening a PR for this soon.. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41141) avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
[ https://issues.apache.org/jira/browse/SPARK-41141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-41141: - Priority: Minor (was: Major) > avoid introducing a new aggregate expression in the analysis phase when > subquery is referencing it > -- > > Key: SPARK-41141 > URL: https://issues.apache.org/jira/browse/SPARK-41141 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Minor > Labels: spark-sql > > Currently the analyzer phase rules on subquery referencing the aggregate > expression in outer query, avoids introducing a new aggregate only for a > single level aggregate function. It introduces new aggregate expression for > nested aggregate functions. > It is possible to avoid adding this extra aggregate expression easily, > atleast if the outer projection involving aggregate function is exactly same > as the one that is used in subquery, or if the outer query's projection > involving aggregate function is a subtree of the subquery's expression. > > Thus consider the following 2 cases: > 1) select cos (sum(a)) , b from t1 group by b having exists (select x from > t2 where y = cos(sum(a)) ) > 2) select sum(a) , b from t1 group by b having exists (select x from t2 > where y = cos(sum(a)) ) > > In both the above cases, there is no need for adding extra aggregate > expression. > > I am also investigating if its possible to avoid if the case is > > 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from > t2 where y = sum(a) ) > > This Jira also is needed for another issue where subquery datasource v2 is > projecting columns which are not needed. ( no Jira filed yet for that, will > do that..) > > Will be opening a PR for this soon.. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-41141) avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
[ https://issues.apache.org/jira/browse/SPARK-41141?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-41141: - Description: Currently the analyzer phase rules on subquery referencing the aggregate expression in outer query, avoids introducing a new aggregate only for a single level aggregate function. It introduces new aggregate expression for nested aggregate functions. It is possible to avoid adding this extra aggregate expression easily, atleast if the outer projection involving aggregate function is exactly same as the one that is used in subquery, or if the outer query's projection involving aggregate function is a subtree of the subquery's expression. Thus consider the following 2 cases: 1) select cos (sum(a)) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) 2) select sum(a) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) In both the above cases, there is no need for adding extra aggregate expression. I am also investigating if its possible to avoid if the case is 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from t2 where y = sum(a) ) This Jira also is needed for another issue where subquery datasource v2 is projecting columns which are not needed. ( no Jira filed yet for that, will do that..) Will be opening a PR for this soon.. was: Currently the analyzer phase rules on subquery referencing the aggregate expression in outer query, avoids introducing a new aggregate only for a single level aggregate function. It introduces new aggregate expression for nested aggregate functions. It is possible to avoid adding this extra aggregate expression easily, atleast if the outer projection involving aggregate function is exactly same as the one that is used in subquery, or if the outer query's projection involving aggregate function is a subtree of the subquery's expression. Thus consider the following 2 cases: 1) select cos (sum(a)) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) 2) select sum(a) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) In both the above cases, there is no need for adding extra aggregate expression. I am also investigating if its possible to avoid if the case is 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from t2 where y = sum(a) ) This Jira also is needed for another issue where subquery datasource v2 is projecting columns which are not needed. ( no Jira filed yet for that, will do that..) > avoid introducing a new aggregate expression in the analysis phase when > subquery is referencing it > -- > > Key: SPARK-41141 > URL: https://issues.apache.org/jira/browse/SPARK-41141 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.3.1 >Reporter: Asif >Priority: Major > Labels: spark-sql > > Currently the analyzer phase rules on subquery referencing the aggregate > expression in outer query, avoids introducing a new aggregate only for a > single level aggregate function. It introduces new aggregate expression for > nested aggregate functions. > It is possible to avoid adding this extra aggregate expression easily, > atleast if the outer projection involving aggregate function is exactly same > as the one that is used in subquery, or if the outer query's projection > involving aggregate function is a subtree of the subquery's expression. > > Thus consider the following 2 cases: > 1) select cos (sum(a)) , b from t1 group by b having exists (select x from > t2 where y = cos(sum(a)) ) > 2) select sum(a) , b from t1 group by b having exists (select x from t2 > where y = cos(sum(a)) ) > > In both the above cases, there is no need for adding extra aggregate > expression. > > I am also investigating if its possible to avoid if the case is > > 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from > t2 where y = sum(a) ) > > This Jira also is needed for another issue where subquery datasource v2 is > projecting columns which are not needed. ( no Jira filed yet for that, will > do that..) > > Will be opening a PR for this soon.. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-41141) avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it
Asif created SPARK-41141: Summary: avoid introducing a new aggregate expression in the analysis phase when subquery is referencing it Key: SPARK-41141 URL: https://issues.apache.org/jira/browse/SPARK-41141 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.3.1 Reporter: Asif Currently the analyzer phase rules on subquery referencing the aggregate expression in outer query, avoids introducing a new aggregate only for a single level aggregate function. It introduces new aggregate expression for nested aggregate functions. It is possible to avoid adding this extra aggregate expression easily, atleast if the outer projection involving aggregate function is exactly same as the one that is used in subquery, or if the outer query's projection involving aggregate function is a subtree of the subquery's expression. Thus consider the following 2 cases: 1) select cos (sum(a)) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) 2) select sum(a) , b from t1 group by b having exists (select x from t2 where y = cos(sum(a)) ) In both the above cases, there is no need for adding extra aggregate expression. I am also investigating if its possible to avoid if the case is 3) select Cos(sum(a)) , b from t1 group by b having exists (select x from t2 where y = sum(a) ) This Jira also is needed for another issue where subquery datasource v2 is projecting columns which are not needed. ( no Jira filed yet for that, will do that..) -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17606817#comment-17606817 ] Asif commented on SPARK-33152: -- Added a test *CompareNewAndOldConstraintsSuite* in the PR which when run on master will highlight functionality issues with master as well as perf issue. > SPIP: Constraint Propagation code causes OOM issues or increasing compilation > time to hours > --- > > Key: SPARK-33152 > URL: https://issues.apache.org/jira/browse/SPARK-33152 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 2.4.0, 3.0.1, 3.1.2 >Reporter: Asif >Priority: Major > Labels: SPIP > Original Estimate: 168h > Remaining Estimate: 168h > > h2. Q1. What are you trying to do? Articulate your objectives using > absolutely no jargon. > Proposing new algorithm to create, store and use constraints for removing > redundant filters & inferring new filters. > The current algorithm has subpar performance in complex expression scenarios > involving aliases( with certain use cases the compilation time can go into > hours), potential to cause OOM, may miss removing redundant filters in > different scenarios, may miss creating IsNotNull constraints in different > scenarios, does not push compound predicates in Join. > # This issue if not fixed can cause OutOfMemory issue or unacceptable query > compilation times. > Have added a test "plan equivalence with case statements and performance > comparison with benefit of more than 10x conservatively" in > org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. > *With this PR the compilation time is 247 ms vs 13958 ms without the change* > # It is more effective in filter pruning as is evident in some of the tests > in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite > where current code is not able to identify the redundant filter in some cases. > # It is able to generate a better optimized plan for join queries as it can > push compound predicates. > # The current logic can miss a lot of possible cases of removing redundant > predicates, as it fails to take into account if same attribute or its aliases > are repeated multiple times in a complex expression. > # There are cases where some of the optimizer rules involving removal of > redundant predicates fail to remove on the basis of constraint data. In some > cases the rule works, just by the virtue of previous rules helping it out to > cover the inaccuracy. That the ConstraintPropagation rule & its function of > removal of redundant filters & addition of new inferred filters is dependent > on the working of some of the other unrelated previous optimizer rules is > behaving, is indicative of issues. > # It does away with all the EqualNullSafe constraints as this logic does not > need those constraints to be created. > # There is at least one test in existing ConstraintPropagationSuite which is > missing a IsNotNull constraints because the code incorrectly generated a > EqualsNullSafeConstraint instead of EqualTo constraint, when using the > existing Constraints code. With these changes, the test correctly creates an > EqualTo constraint, resulting in an inferred IsNotNull constraint > # It does away with the current combinatorial logic of evaluation all the > constraints can cause compilation to run into hours or cause OOM. The number > of constraints stored is exactly the same as the number of filters encountered > h2. Q2. What problem is this proposal NOT designed to solve? > It mainly focuses on compile time performance, but in some cases can benefit > run time characteristics too, like inferring IsNotNull filter or pushing down > compound predicates on the join, which currently may get missed/ does not > happen , respectively, by the present code. > h2. Q3. How is it done today, and what are the limits of current practice? > Current ConstraintsPropagation code, pessimistically tries to generates all > the possible combinations of constraints , based on the aliases ( even then > it may miss a lot of combinations if the expression is a complex expression > involving same attribute repeated multiple times within the expression and > there are many aliases to that column). There are query plans in our > production env, which can result in intermediate number of constraints going > into hundreds of thousands, causing OOM or taking time running into hours. > Also there are cases where it incorrectly generates an EqualNullSafe > constraint instead of EqualTo constraint , thus missing a possible IsNull > constraint on column. > Also it only pushes single column predicate on the other side of the join. > The constraints generated , in
[jira] [Updated] (SPARK-33152) SPIP: Constraint Propagation code causes OOM issues or increasing compilation time to hours
[ https://issues.apache.org/jira/browse/SPARK-33152?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-33152: - Shepherd: Wenchen Fan (was: Arnaud Doucet) Description: h2. Q1. What are you trying to do? Articulate your objectives using absolutely no jargon. Proposing new algorithm to create, store and use constraints for removing redundant filters & inferring new filters. The current algorithm has subpar performance in complex expression scenarios involving aliases( with certain use cases the compilation time can go into hours), potential to cause OOM, may miss removing redundant filters in different scenarios, may miss creating IsNotNull constraints in different scenarios, does not push compound predicates in Join. # This issue if not fixed can cause OutOfMemory issue or unacceptable query compilation times. Have added a test "plan equivalence with case statements and performance comparison with benefit of more than 10x conservatively" in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite. *With this PR the compilation time is 247 ms vs 13958 ms without the change* # It is more effective in filter pruning as is evident in some of the tests in org.apache.spark.sql.catalyst.plans.OptimizedConstraintPropagationSuite where current code is not able to identify the redundant filter in some cases. # It is able to generate a better optimized plan for join queries as it can push compound predicates. # The current logic can miss a lot of possible cases of removing redundant predicates, as it fails to take into account if same attribute or its aliases are repeated multiple times in a complex expression. # There are cases where some of the optimizer rules involving removal of redundant predicates fail to remove on the basis of constraint data. In some cases the rule works, just by the virtue of previous rules helping it out to cover the inaccuracy. That the ConstraintPropagation rule & its function of removal of redundant filters & addition of new inferred filters is dependent on the working of some of the other unrelated previous optimizer rules is behaving, is indicative of issues. # It does away with all the EqualNullSafe constraints as this logic does not need those constraints to be created. # There is at least one test in existing ConstraintPropagationSuite which is missing a IsNotNull constraints because the code incorrectly generated a EqualsNullSafeConstraint instead of EqualTo constraint, when using the existing Constraints code. With these changes, the test correctly creates an EqualTo constraint, resulting in an inferred IsNotNull constraint # It does away with the current combinatorial logic of evaluation all the constraints can cause compilation to run into hours or cause OOM. The number of constraints stored is exactly the same as the number of filters encountered h2. Q2. What problem is this proposal NOT designed to solve? It mainly focuses on compile time performance, but in some cases can benefit run time characteristics too, like inferring IsNotNull filter or pushing down compound predicates on the join, which currently may get missed/ does not happen , respectively, by the present code. h2. Q3. How is it done today, and what are the limits of current practice? Current ConstraintsPropagation code, pessimistically tries to generates all the possible combinations of constraints , based on the aliases ( even then it may miss a lot of combinations if the expression is a complex expression involving same attribute repeated multiple times within the expression and there are many aliases to that column). There are query plans in our production env, which can result in intermediate number of constraints going into hundreds of thousands, causing OOM or taking time running into hours. Also there are cases where it incorrectly generates an EqualNullSafe constraint instead of EqualTo constraint , thus missing a possible IsNull constraint on column. Also it only pushes single column predicate on the other side of the join. The constraints generated , in some cases, are missing the required ones, and the plan apparently is behaving correctly only due to the preceding unrelated optimizer rule. Have Test which show that with the bare mnimum rules containing RemoveRedundantPredicate, it misses the removal of redundant predicate. h2. Q4. What is new in your approach and why do you think it will be successful? It solves all the above mentioned issues. # The number of constraints created are same as the number of filters. No combinatorial creation of constraints. No need for EqualsNullSafe constraint on aliases. # Can remove redundant predicates on any expression involving aliases irrespective of the number of repeat occurences in all possible combination. # Brings down query compilation time to few minutes from hours. # Can push compound predicates on
[jira] [Updated] (SPARK-40362) Bug in Canonicalization of expressions like Add & Multiply i.e Commutative Operators
[ https://issues.apache.org/jira/browse/SPARK-40362?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Asif updated SPARK-40362: - Description: In the canonicalization code which is now in two stages, canonicalization involving Commutative operators is broken, if they are subexpressions of certain type of expressions which override precanonicalize, for example BinaryComparison Consider following expression: a + b > 10 GT | a + b 10 The BinaryComparison operator in the precanonicalize, first precanonicalizes children & then may swap operands based on left /right hashCode inequality.. lets say Add(a + b) .hashCode is > 10.hashCode as a result GT is converted to LT But If the same tree is created GT | b + a 10 The hashCode of Add(b, a) is not same as Add(a, b) , thus it is possible that for this tree Add(b + a) .hashCode is < 10.hashCode in which case GT remains as is. Thus to similar trees result in different canonicalization , one having GT other having LT The problem occurs because for commutative expressions the canonicalization normalizes the expression with consistent hashCode which is not the case with precanonicalize as the hashCode of commutative expression 's precanonicalize and post canonicalize are different. The test {quote}test("bug X") Unknown macro: \{ val tr1 = LocalRelation('c.int, 'b.string, 'a.int) val y = tr1.where('a.attr + 'c.attr > 10).analyze val fullCond = y.asInstanceOf[Filter].condition.clone() val addExpr = (fullCond match Unknown macro} ).clone().asInstanceOf[Add] val canonicalizedFullCond = fullCond.canonicalized // swap the operands of add val newAddExpr = Add(addExpr.right, addExpr.left) // build a new condition which is same as the previous one, but with operands of //Add reversed val builtCondnCanonicalized = GreaterThan(newAddExpr, Literal(10)).canonicalized assertEquals(canonicalizedFullCond, builtCondnCanonicalized) } {quote} This test fails. The fix which I propose is that for the commutative expressions, the precanonicalize should be overridden and Canonicalize.reorderCommutativeOperators be invoked on the expression instead of at place of canonicalize. effectively for commutative operands ( add, or , multiply , and etc) canonicalize and precanonicalize should be same. PR: [https://github.com/apache/spark/pull/37824] I am also trying a better fix, where by the idea is that for commutative expressions the murmur hashCode are caluculated using unorderedHash so that it is order independent ( i.e symmetric). The above approach works fine , but in case of Least & Greatest, the Product's element is a Seq, and that messes with consistency of hashCode. was: In the canonicalization code which is now in two stages, canonicalization involving Commutative operators is broken, if they are subexpressions of certain type of expressions which override precanonicalize. Consider following expression: a + b > 10 This GreaterThan expression when canonicalized as a whole for first time, will skip the call to Canonicalize.reorderCommutativeOperators for the Add expression as the GreaterThan's canonicalization used precanonicalize on children ( the Add expression). so if create a new expression b + a > 10 and invoke canonicalize it, the canonicalized versions of these two expressions will not match. The test {quote}test("bug X") { val tr1 = LocalRelation('c.int, 'b.string, 'a.int) val y = tr1.where('a.attr + 'c.attr > 10).analyze val fullCond = y.asInstanceOf[Filter].condition.clone() val addExpr = (fullCond match Unknown macro: \{ case GreaterThan(x} ).clone().asInstanceOf[Add] val canonicalizedFullCond = fullCond.canonicalized // swap the operands of add val newAddExpr = Add(addExpr.right, addExpr.left) // build a new condition which is same as the previous one, but with operands of //Add reversed val builtCondnCanonicalized = GreaterThan(newAddExpr, Literal(10)).canonicalized assertEquals(canonicalizedFullCond, builtCondnCanonicalized) } {quote} This test fails. The fix which I propose is that for the commutative expressions, the precanonicalize should be overridden and Canonicalize.reorderCommutativeOperators be invoked on the expression instead of at place of canonicalize. effectively for commutative operands ( add, or , multiply , and etc) canonicalize and precanonicalize should be same. PR: https://github.com/apache/spark/pull/37824 > Bug in Canonicalization of expressions like Add & Multiply i.e Commutative > Operators > > > Key: SPARK-40362 > URL: https://issues.apache.org/jira/browse/SPARK-40362 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 3.3.0 >