Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1511830190 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, child) Review Comment: renamed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on PR #45125: URL: https://github.com/apache/spark/pull/45125#issuecomment-1977503018 > What about if there's another node above the aggregate in the subquery, such as a filter after the aggregate (having clause)? added a test, but any non-trivial node about the aggregate (such as filter) results in having a DomainJoin, so constant folding does not kick in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1511831412 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. Review Comment: verified that the only plan changes we have in TPCDS are trivial (e.g., order of fields in project changes, or Project[a, b] becomes Project[a, a, b], but that gets resolved in physical planning). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on PR #45125: URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518871 @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on PR #45125: URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518691 > Thanks for the fix, looks good overall. > > Let's add a gating flag for this change just in case of any issues. added a flag -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1512181287 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) && + mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child) Review Comment: Shouldn't `a.references` always contain `groupRefs`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1512183617 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) && + mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child) +val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression( + s.withNewPlan(projectOverAggregateChild))) +assert(optimizedPlan.isInstanceOf[Subquery]) +val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child + +assert(optimizedInput.output.size == projectOverAggregateChild.output.size) +val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map { + case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId) Review Comment: This is a bit weird. We don't do this for normal subquery optimization, why it's needed here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1513261508 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) && + mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child) Review Comment: you are right, removed the groupRefs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1513262881 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) && + mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverAggregateChild = Project(groupRefs ++ a.references.toSeq, child) +val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression( + s.withNewPlan(projectOverAggregateChild))) +assert(optimizedPlan.isInstanceOf[Subquery]) +val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child + +assert(optimizedInput.output.size == projectOverAggregateChild.output.size) +val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map { + case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId) Review Comment: We are preserving the Aggregate, and optimizing its input via Optimizer.execute(). This project is meant to ensure that the Aggregate still gets the attributes with expected IDs. The "normal" subquery optimization path does not care about aggregates and therefore does not need it. I added a comment to clarify it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1513701474 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,34 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG) && + mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val projectOverAggregateChild = Project(a.references.toSeq, child) +val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression( + s.withNewPlan(projectOverAggregateChild))) +assert(optimizedPlan.isInstanceOf[Subquery]) +val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child + +assert(optimizedInput.output.size == projectOverAggregateChild.output.size) +// We preserve the top aggregation, but its input has been optimized via +// Optimizer.execute(). +// Make sure that the attributes still have IDs expected by the Aggregate node +// by inserting a project. +val updatedProjectList = projectOverAggregateChild.output.zip(optimizedInput.output).map { + case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = oldAttr.exprId) +} + +val res = s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, optimizedInput +res Review Comment: ```suggestion s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, optimizedInput ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1513701628 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: why do we need this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1513701849 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: I think all subqueries will be optimized recursively. There is no exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1514762032 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: This is needed when an Aggregate (count-bug-susceptible) has a WITH expression in it, because we skip optimizing it in OptimizeSubqueries. This only matters because RewriteWithExpression appears very early on in the optimization, and it is not triggered after we unnest the subquery. Any "later" rule would apply on the rewritten subquery, so it would not be a problem. For this "early" rules there are two options: i) repeat them after the subquery is fully unnested (i.e., the count bug has been handled, and the subquery is replaced with a join), or ii) make sure they run on subqueries. I chose option ii). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1514771822 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: To be more precise, this PR deals with the following subqueries: S: Aggregate (mayhavecountbug = true) | Before this PR, OptimizeSubqueries applied on S, with this PR OptimizeSubqueries applies on . After we unnest the subquery, the rules apply on the entire query (including the aggregate), so we don't miss any optimization opportunities by excluding Aggregate in OptimizeSubqueries. However, a rule RewriteWithExpressions is done before OptimizeSubqueries, so we need to make sure subqueries are handled there (in case Aggregate node has a WITH expresion in it). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1515561516 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: Then this rule becomes O(n^2) complexity as every level of subqueries runs this rule for all subqueries under its level. Thinking about it more, why do we handle the count bug in two places that are far away? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
jchen5 commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1516503722 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: > Thinking about it more, why do we handle the count bug in two places that are far away? I wrote a doc about the reason for it and whether we can change it: https://docs.google.com/document/d/1YCce0DtJ6NkMP1QM1nnfYvkiH1CkHoGyMjKQ8MX5bUM/edit -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
agubichev commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1516770647 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala: ## @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX */ object RewriteWithExpression extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = { -plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) { + plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) { Review Comment: Why is it O(n^2)? The ReplaceWithExpression would replace all WITH expressions in one pass when we first call it, would not it? (I know that technically it is also called as part of optimization loop in OptimizeSubqueries, but by that point all the WITH expressions are replaced). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan commented on PR #45125: URL: https://github.com/apache/spark/pull/45125#issuecomment-2018411588 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
cloud-fan closed pull request #45125: [SPARK-46743][SQL] Count bug after constant folding URL: https://github.com/apache/spark/pull/45125 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]
jchen5 commented on code in PR #45125: URL: https://github.com/apache/spark/pull/45125#discussion_r1498343024 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. +val groupRefs = group.flatMap(x => x.references) +val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, child) Review Comment: maybe projectOverAggregateChild would be clearer ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala: ## @@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager) // Do not optimize DPP subquery, as it was created from optimized plan and we should not // optimize it again, to save optimization time and avoid breaking broadcast/subquery reuse. case d: DynamicPruningSubquery => d + case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, mayHaveCountBug) +if mayHaveCountBug.nonEmpty && mayHaveCountBug.get => +// This is a subquery with an aggregate that may suffer from a COUNT bug. +// Detailed COUNT bug detection is done at a later stage (e.g. in +// RewriteCorrelatedScalarSubquery). +// Make sure that the output plan always has the same aggregate node +// (i.e., it is not being constant folded). +// Note that this does not limit optimization opportunities for the subquery: after +// decorrelation is done, the subquery's body becomes part of the main plan and all +// optimization rules are applied again. Review Comment: There could potentially be changes in optimization due to different order in which rules get applied, right? I think that would be quite edge case, but can we confirm whether there are plan changes in benchmarks at least? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org