Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid closed pull request #43806: [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec URL: https://github.com/apache/spark/pull/43806 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1813288994 @beliefer I think you may be right. In my another PR for broadcast-var-pushdown, I am seeing unmodified SubqueryAdaptiveBroadcastExec in the stage cache 's keys. May be it is an issue in my code or something else. Will check my code again for this. So as of now, I think it makes sense to close this PR and also the other PR in SubqueryBroadcastHashExec -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
beliefer commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1812439348 > Ideally that should have happened, but what I see is one stage containing subquery adaptive exec and incoming exchange contains subqueryexec. Also this is just 1 of the issues. The main pr will be the one which requires additional functions to be implemented by DataSourceV2impl. Pls refer to the ticket which depends on this pr. Mat be this itself can be fixed by ensuring incoming exchange contains right exec. Then tinkering with canonicalize may not be needed. But in that case the buildPlan should be canonicalized That is impossible. `SubqueryAdaptiveBroadcastExec` will be eliminate with `DynamicPruningExpression`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1812050963 > AFAIK, the `SubqueryAdaptiveBroadcastExec` only used for dynamic partition pruning. `SubqueryAdaptiveBroadcastExec` will be replaced with `SubqueryBroadcastExec` and the later must reuse the broadcast exchange. Ideally that should have happened, but what I see is one stage containing subquery adaptive exec and incoming exchange contains subqueryexec. Also this is just 1 of the issues. The main pr will be the one which requires additional functions to be implemented by DataSourceV2impl. Pls refer to the ticket which depends on this pr. Mat be this itself can be fixed by ensuring incoming exchange contains right exec. Then tinkering with canonicalize may not be needed. But in that case the buildPlan should be canonicalized -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
beliefer commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393856527 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -46,7 +46,8 @@ case class SubqueryAdaptiveBroadcastExec( protected override def doCanonicalize(): SparkPlan = { val keys = buildKeys.map(k => QueryPlan.normalizeExpressions(k, child.output)) -copy(name = "dpp", buildKeys = keys, child = child.canonicalized) +SubqueryBroadcastExec(name = "dpp", index = index, buildKeys = keys, Review Comment: The implementation looks some against the design. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
beliefer commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1812033293 AFAIK, the `SubqueryAdaptiveBroadcastExec` only used for dynamic partition pruning. `SubqueryAdaptiveBroadcastExec` will be replaced with `SubqueryBroadcastExec` and the later must reuse the broadcast exchange. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1811971501 I have reworked the PR to just canonicalize the SubqueryAdaptiveBroadcastExec as SubqueryBroadcastExec. This also fixes the reuse of exchange issue and seems to be lesser impacting change. It also means that PR [SPARK-45925](https://github.com/apache/spark/pull/43807) can be closed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1811817310 The other option is that we make the canonicalized form of both SubqueryAdaptiveBroadcastExec and SubqueryBroadcastExec to be of type SubqueryBroadcastExec. that way equals and hashCode will go away and it will srver our purpose. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393590377 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: I don't think that would work as such and may actually complicate the things. the issue lies in the following code : In AdaptivePlanExec: where we look for exising stage for reuse: ` if (conf.exchangeReuseEnabled) { // Check the `stageCache` again for reuse. If a match is found, ditch the new stage // and reuse the existing stage found in the `stageCache`, otherwise update the // `stageCache` with the new stage. val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { newStage = reuseQueryStage(queryStage, e) } } ` The canonicalized plan in the cache may contain a InSubqueryExec plan which contains SubqueryAdaptiveBroadcastExec, while incoming canonicalized plan contains SubqueryBroadcastExec in place of SubqueryAdaptiveBroadcastExec . and hence mistmatch. Any replacement , will potentially complicate things and cause perf issue too, because these nodes though extending SparkPlan, are actually embedded in ExecSubqueryExpression which means checking for each expression in all the spark plan nodes to modify. and then potentially adjusting the logical plan tags too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393590377 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: I don't think that would work as such and may actually complicate the things. the issue lies in the following code : In AdaptivePlanExec: where we look for exising stage for reuse: ` if (conf.exchangeReuseEnabled) { // Check the `stageCache` again for reuse. If a match is found, ditch the new stage // and reuse the existing stage found in the `stageCache`, otherwise update the // `stageCache` with the new stage. val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { newStage = reuseQueryStage(queryStage, e) } } ` The canonicalized plan contains SubqueryAdaptiveBroadcastExec, while incoming canonicalized plan contains SubqueryBroadcastExec. and hence mistmatch. Any replacement , will potentially complicate things and cause perf issue too, because these nodes though extending SparkPlan, are actually embedded in ExecSubqueryExpression which means checking for each expression in all the spark plan nodes to modify. and then potentially adjusting the logical plan tags too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393590377 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: I don't think that would work as such and may actually complicate the things. the issue lies in the following code : In AdaptivePlanExec: where we look for exising stage for reuse: `` if (conf.exchangeReuseEnabled) { // Check the `stageCache` again for reuse. If a match is found, ditch the new stage // and reuse the existing stage found in the `stageCache`, otherwise update the // `stageCache` with the new stage. val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { newStage = reuseQueryStage(queryStage, e) } } `` The canonicalized plan contains SubqueryAdaptiveBroadcastExec, while incoming canonicalized plan contains SubqueryBroadcastExec. and hence mistmatch. Any replacement , will potentially complicate things and cause perf issue too, because these nodes though extending SparkPlan, are actually embedded in ExecSubqueryExpression which means checking for each expression in all the spark plan nodes to modify. and then potentially adjusting the logical plan tags too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393590377 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: I don't think that would work as such and may actually complicate the things. the issue lies in the following code : In AdaptivePlanExec: where we look for exising stage for reuse: ` if (conf.exchangeReuseEnabled) { // Check the `stageCache` again for reuse. If a match is found, ditch the new stage // and reuse the existing stage found in the `stageCache`, otherwise update the // `stageCache` with the new stage. val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { newStage = reuseQueryStage(queryStage, e) } } ` The canonicalized plan contains SubqueryAdaptiveBroadcastExec, while incoming canonicalized plan contains SubqueryBroadcastExec. and hence mistmatch. Any replacement , will potentially complicate things and cause perf issue too, because these nodes though extending SparkPlan, are actually embedded in ExecSubqueryExpression which means checking for each expression in all the spark plan nodes to modify. and then potentially adjusting the logical plan tags too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ahshahid commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393590377 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: I don't think that would work as such and may actually complicate the things. the issue lies in the following code : In AdaptivePlanExec: where we look for exising stage for reuse: if (conf.exchangeReuseEnabled) { // Check the `stageCache` again for reuse. If a match is found, ditch the new stage // and reuse the existing stage found in the `stageCache`, otherwise update the // `stageCache` with the new stage. val queryStage = context.stageCache.getOrElseUpdate( newStage.plan.canonicalized, newStage) if (queryStage.ne(newStage)) { newStage = reuseQueryStage(queryStage, e) } } The canonicalized plan contains SubqueryAdaptiveBroadcastExec, while incoming canonicalized plan contains SubqueryBroadcastExec. and hence mistmatch. Any replacement , will potentially complicate things and cause perf issue too, because these nodes though extending SparkPlan, are actually embedded in ExecSubqueryExpression which means checking for each expression in all the spark plan nodes to modify. and then potentially adjusting the logical plan tags too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
ulysses-you commented on code in PR #43806: URL: https://github.com/apache/spark/pull/43806#discussion_r1393580889 ## sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala: ## @@ -44,9 +46,21 @@ case class SubqueryAdaptiveBroadcastExec( throw QueryExecutionErrors.executeCodePathUnsupportedError("SubqueryAdaptiveBroadcastExec") } + override def equals(other: Any): Boolean = other match { +case x: SubqueryAdaptiveBroadcastExec => this.name == x.name && this.index == x.index && +this.onlyInBroadcast == x.onlyInBroadcast && this.buildPlan == x.buildPlan && +this.buildKeys == x.buildKeys && this.child == x.child +case y: SubqueryBroadcastExec => this.name == y.name && this.index == y.index && + this.buildKeys == y.buildKeys && this.child == y.child Review Comment: IIUC, when we replace SubqueryAdaptiveBroadcastExec to SubqueryBroadcastExec, we would add a broadcast exchange on the top of child then AQE would reuse the broadcast exchange. Then can `SubqueryAdaptiveBroadcastExec.child` be same with `SubqueryBroadcastExec.child`(broadcast exchange) ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45924][SQL] Fixing the canonicalization of SubqueryAdaptiveBroadcastExec and making it equivalent with SubqueryBroadcastExec [spark]
HyukjinKwon commented on PR #43806: URL: https://github.com/apache/spark/pull/43806#issuecomment-1811687115 cc @peter-toth and @ulysses-you FYI -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org