Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1511830190


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, 
child)

Review Comment:
   renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977503018

   > What about if there's another node above the aggregate in the subquery, 
such as a filter after the aggregate (having clause)?
   
   added a test, but any non-trivial node about the aggregate (such as filter) 
results in having a DomainJoin, so constant folding does not kick in.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1511831412


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.

Review Comment:
   verified that the only plan changes we have in TPCDS are trivial (e.g., 
order of fields in project changes, or Project[a, b] becomes Project[a, a, b], 
but that gets resolved in physical planning).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518871

   @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


agubichev commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-1977518691

   > Thanks for the fix, looks good overall.
   > 
   > Let's add a gating flag for this change just in case of any issues.
   
   added a flag


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1512181287


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+  mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverAggregateChild = Project(groupRefs ++ 
a.references.toSeq, child)

Review Comment:
   Shouldn't `a.references` always contain `groupRefs`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-04 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1512183617


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+  mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverAggregateChild = Project(groupRefs ++ 
a.references.toSeq, child)
+val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+  s.withNewPlan(projectOverAggregateChild)))
+assert(optimizedPlan.isInstanceOf[Subquery])
+val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+assert(optimizedInput.output.size == 
projectOverAggregateChild.output.size)
+val updatedProjectList = 
projectOverAggregateChild.output.zip(optimizedInput.output).map {
+  case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = 
oldAttr.exprId)

Review Comment:
   This is a bit weird. We don't do this for normal subquery optimization, why 
it's needed here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-05 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513261508


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+  mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverAggregateChild = Project(groupRefs ++ 
a.references.toSeq, child)

Review Comment:
   you are right, removed the groupRefs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-05 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513262881


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,31 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+  mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverAggregateChild = Project(groupRefs ++ 
a.references.toSeq, child)
+val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+  s.withNewPlan(projectOverAggregateChild)))
+assert(optimizedPlan.isInstanceOf[Subquery])
+val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+assert(optimizedInput.output.size == 
projectOverAggregateChild.output.size)
+val updatedProjectList = 
projectOverAggregateChild.output.zip(optimizedInput.output).map {
+  case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = 
oldAttr.exprId)

Review Comment:
   We are preserving the Aggregate, and optimizing its input via 
Optimizer.execute(). This project is meant to ensure that the Aggregate still 
gets the attributes with expected IDs. The "normal" subquery optimization path 
does not care about aggregates and therefore does not need it.
   I added a comment to clarify it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-05 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701474


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,34 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s @ ScalarSubquery(a @ Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if 
conf.getConf(SQLConf.DECORRELATE_SUBQUERY_PREVENT_CONSTANT_FOLDING_FOR_COUNT_BUG)
 &&
+  mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val projectOverAggregateChild = Project(a.references.toSeq, child)
+val optimizedPlan = Optimizer.this.execute(Subquery.fromExpression(
+  s.withNewPlan(projectOverAggregateChild)))
+assert(optimizedPlan.isInstanceOf[Subquery])
+val optimizedInput = optimizedPlan.asInstanceOf[Subquery].child
+
+assert(optimizedInput.output.size == 
projectOverAggregateChild.output.size)
+// We preserve the top aggregation, but its input has been optimized 
via
+// Optimizer.execute().
+// Make sure that the attributes still have IDs expected by the 
Aggregate node
+// by inserting a project.
+val updatedProjectList = 
projectOverAggregateChild.output.zip(optimizedInput.output).map {
+  case (oldAttr, newAttr) => Alias(newAttr, newAttr.name)(exprId = 
oldAttr.exprId)
+}
+
+val res = 
s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, 
optimizedInput
+res

Review Comment:
   ```suggestion
   s.withNewPlan(a.withNewChildren(Seq(Project(updatedProjectList, 
optimizedInput
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-05 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701628


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   why do we need this change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-05 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1513701849


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   I think all subqueries will be optimized recursively. There is no exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-06 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1514762032


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   This is needed when an Aggregate (count-bug-susceptible) has a WITH 
expression in it, because we skip optimizing it in OptimizeSubqueries. This 
only matters because RewriteWithExpression appears very early on in the 
optimization, and it is not triggered after we unnest the subquery. Any "later" 
rule would apply on the rewritten subquery, so it would not be a problem. For 
this "early" rules there are two options: i) repeat them after the subquery is 
fully unnested (i.e., the count bug has been handled, and the subquery is 
replaced with a join), or ii) make sure they run on subqueries. I chose option 
ii).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-06 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1514771822


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   To be more precise, this PR deals with the following subqueries:
   S:
   Aggregate (mayhavecountbug = true)
|
   
   
   Before this PR, OptimizeSubqueries applied on S, with this PR 
OptimizeSubqueries applies on . 
   After we unnest the subquery, the rules apply on the entire query (including 
the aggregate), so we don't miss any optimization opportunities by excluding 
Aggregate in OptimizeSubqueries.
   However, a rule RewriteWithExpressions is done before OptimizeSubqueries, so 
we need to make sure subqueries are handled there (in case Aggregate node has a 
WITH expresion in it).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-06 Thread via GitHub


cloud-fan commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1515561516


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   Then this rule becomes O(n^2) complexity as every level of subqueries runs 
this rule for all subqueries under its level.
   
   Thinking about it more, why do we handle the count bug in two places that 
are far away?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-07 Thread via GitHub


jchen5 commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516503722


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   > Thinking about it more, why do we handle the count bug in two places that 
are far away?
   
   I wrote a doc about the reason for it and whether we can change it: 
https://docs.google.com/document/d/1YCce0DtJ6NkMP1QM1nnfYvkiH1CkHoGyMjKQ8MX5bUM/edit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-07 Thread via GitHub


agubichev commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1516770647


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteWithExpression.scala:
##
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.trees.TreePattern.{COMMON_EXPR_REF, WITH_EX
  */
 object RewriteWithExpression extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
-plan.transformWithPruning(_.containsPattern(WITH_EXPRESSION)) {
+
plan.transformDownWithSubqueriesAndPruning(_.containsPattern(WITH_EXPRESSION)) {

Review Comment:
   Why is it O(n^2)? The ReplaceWithExpression would replace all WITH 
expressions in one pass when we first call it, would not it? (I know that 
technically it is also called as part of optimization loop in 
OptimizeSubqueries, but by that point all the WITH expressions are replaced).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-25 Thread via GitHub


cloud-fan commented on PR #45125:
URL: https://github.com/apache/spark/pull/45125#issuecomment-2018411588

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-03-25 Thread via GitHub


cloud-fan closed pull request #45125: [SPARK-46743][SQL] Count bug after 
constant folding
URL: https://github.com/apache/spark/pull/45125


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-46743][SQL] Count bug after constant folding [spark]

2024-02-21 Thread via GitHub


jchen5 commented on code in PR #45125:
URL: https://github.com/apache/spark/pull/45125#discussion_r1498343024


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.
+val groupRefs = group.flatMap(x => x.references)
+val projectOverSubqueryBody = Project(groupRefs ++ a.references.toSeq, 
child)

Review Comment:
   maybe projectOverAggregateChild would be clearer



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##
@@ -328,6 +328,30 @@ abstract class Optimizer(catalogManager: CatalogManager)
   // Do not optimize DPP subquery, as it was created from optimized plan 
and we should not
   // optimize it again, to save optimization time and avoid breaking 
broadcast/subquery reuse.
   case d: DynamicPruningSubquery => d
+  case s@ScalarSubquery(a@Aggregate(group, _, child), _, _, _, _, 
mayHaveCountBug)
+if mayHaveCountBug.nonEmpty && mayHaveCountBug.get =>
+// This is a subquery with an aggregate that may suffer from a COUNT 
bug.
+// Detailed COUNT bug detection is done at a later stage (e.g. in
+// RewriteCorrelatedScalarSubquery).
+// Make sure that the output plan always has the same aggregate node
+// (i.e., it is not being constant folded).
+// Note that this does not limit optimization opportunities for the 
subquery: after
+// decorrelation is done, the subquery's body becomes part of the main 
plan and all
+// optimization rules are applied again.

Review Comment:
   There could potentially be changes in optimization due to different order in 
which rules get applied, right? I think that would be quite edge case, but can 
we confirm whether there are plan changes in benchmarks at least?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org