[GitHub] [spark] allisonwang-db commented on a change in pull request #32179: [SPARK-35080][SQL] Only allow a subset of correlated equality predicates when a subquery is aggregated

2021-04-16 Thread GitBox


allisonwang-db commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r615178411



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##
@@ -899,14 +899,71 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 // +- SubqueryAlias t1, `t1`
 // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
 // +- LocalRelation [_1#73, _2#74]
-def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): 
Unit = {
-  if (found) {
+// SPARK-35080: The same issue can happen to correlated equality 
predicates when
+// they do not guarantee one-to-one mapping between inner and outer 
attributes.
+// For example:
+// Table:
+//   t1(a, b): [(0, 6), (1, 5), (2, 4)]
+//   t2(c): [(6)]
+//
+// Query:
+//   SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+//
+// Original subquery plan:
+//   Aggregate [count(1)]
+//   +- Filter ((a + b) = outer(c))
+//  +- LocalRelation [a, b]
+//
+// Plan after pulling up correlated predicates:
+//   Aggregate [a, b] [count(1), a, b]
+//   +- LocalRelation [a, b]
+//
+// Plan after rewrite:
+//   Project [c1, count(1)]
+//   +- Join LeftOuter ((a + b) = c)
+//  :- LocalRelation [c]
+//  +- Aggregate [a, b] [count(1), a, b]
+// +- LocalRelation [a, b]
+//
+// The right hand side of the join transformed from the subquery will 
output
+//   count(1) | a | b
+//  1 | 0 | 6
+//  1 | 1 | 5
+//  1 | 2 | 4
+// and the plan after rewrite will give the original query incorrect 
results.
+def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: 
LogicalPlan): Unit = {
+  if (predicates.nonEmpty) {
 // Report a non-supported case as an exception
-failAnalysis(s"Correlated column is not allowed in a non-equality 
predicate:\n$p")
+failAnalysis(s"Correlated column is not allowed in predicate " +
+  s"${predicates.map(_.sql).mkString}:\n$p")
   }
 }
 
-var foundNonEqualCorrelatedPred: Boolean = false
+def containsAttribute(e: Expression): Boolean = {
+  e.find(_.isInstanceOf[Attribute]).isDefined
+}
+
+// Given a correlated predicate, check if it is either a non-equality 
predicate or
+// equality predicate that does not guarantee one-on-one mapping between 
inner and
+// outer attributes. When the correlated predicate does not contain any 
attribute
+// (i.e. only has outer references), it is supported and should return 
false. E.G.:
+//   (a = outer(c)) -> false
+//   (outer(c) = outer(d)) -> false
+//   (a > outer(c)) -> true
+//   (a + b = outer(c)) -> true
+// The last one is true because there can be multiple combinations of (a, 
b) that
+// satisfy the equality condition. For example, if outer(c) = 0, then both 
(0, 0)
+// and (-1, 1) can make the predicate evaluate to true.
+def isUnsupportedPredicate(condition: Expression): Boolean = condition 
match {
+  // Only allow equality condition with one side being an attribute and 
another
+  // side being an expression without attributes from the inner query. Note
+  // OuterReference is a leaf node and will not be found here.
+  case Equality(_: Attribute, b) => containsAttribute(b)
+  case Equality(a, _: Attribute) => containsAttribute(a)
+  case o => containsAttribute(o)

Review comment:
   To further avoid the risk, I am thinking about adding a config so that 
people can choose to turn off this check and allow all correlated equality 
predicates at the risk of returning wrong results.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a change in pull request #32179: [SPARK-35080][SQL] Only allow a subset of correlated equality predicates when a subquery is aggregated

2021-04-16 Thread GitBox


allisonwang-db commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r615155910



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##
@@ -899,14 +899,71 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 // +- SubqueryAlias t1, `t1`
 // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
 // +- LocalRelation [_1#73, _2#74]
-def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): 
Unit = {
-  if (found) {
+// SPARK-35080: The same issue can happen to correlated equality 
predicates when
+// they do not guarantee one-to-one mapping between inner and outer 
attributes.
+// For example:
+// Table:
+//   t1(a, b): [(0, 6), (1, 5), (2, 4)]
+//   t2(c): [(6)]
+//
+// Query:
+//   SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+//
+// Original subquery plan:
+//   Aggregate [count(1)]
+//   +- Filter ((a + b) = outer(c))
+//  +- LocalRelation [a, b]
+//
+// Plan after pulling up correlated predicates:
+//   Aggregate [a, b] [count(1), a, b]
+//   +- LocalRelation [a, b]
+//
+// Plan after rewrite:
+//   Project [c1, count(1)]
+//   +- Join LeftOuter ((a + b) = c)
+//  :- LocalRelation [c]
+//  +- Aggregate [a, b] [count(1), a, b]
+// +- LocalRelation [a, b]
+//
+// The right hand side of the join transformed from the subquery will 
output
+//   count(1) | a | b
+//  1 | 0 | 6
+//  1 | 1 | 5
+//  1 | 2 | 4
+// and the plan after rewrite will give the original query incorrect 
results.
+def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: 
LogicalPlan): Unit = {
+  if (predicates.nonEmpty) {
 // Report a non-supported case as an exception
-failAnalysis(s"Correlated column is not allowed in a non-equality 
predicate:\n$p")
+failAnalysis(s"Correlated column is not allowed in predicate " +
+  s"${predicates.map(_.sql).mkString}:\n$p")
   }
 }
 
-var foundNonEqualCorrelatedPred: Boolean = false
+def containsAttribute(e: Expression): Boolean = {
+  e.find(_.isInstanceOf[Attribute]).isDefined
+}
+
+// Given a correlated predicate, check if it is either a non-equality 
predicate or
+// equality predicate that does not guarantee one-on-one mapping between 
inner and
+// outer attributes. When the correlated predicate does not contain any 
attribute
+// (i.e. only has outer references), it is supported and should return 
false. E.G.:
+//   (a = outer(c)) -> false
+//   (outer(c) = outer(d)) -> false
+//   (a > outer(c)) -> true
+//   (a + b = outer(c)) -> true
+// The last one is true because there can be multiple combinations of (a, 
b) that
+// satisfy the equality condition. For example, if outer(c) = 0, then both 
(0, 0)
+// and (-1, 1) can make the predicate evaluate to true.
+def isUnsupportedPredicate(condition: Expression): Boolean = condition 
match {
+  // Only allow equality condition with one side being an attribute and 
another
+  // side being an expression without attributes from the inner query. Note
+  // OuterReference is a leaf node and will not be found here.
+  case Equality(_: Attribute, b) => containsAttribute(b)
+  case Equality(a, _: Attribute) => containsAttribute(a)
+  case o => containsAttribute(o)

Review comment:
   Yes `outer(a) = outer(c)` is supported before and will not cause wrong 
results. I've updated the logic here to only allow equality predicates to be 
consistent with previous behaviors:
   
https://github.com/apache/spark/blob/91bd38467e607dde81d4c83fa3e1c989f8280e89/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala#L954-L957




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a change in pull request #32179: [SPARK-35080][SQL] Only allow a subset of correlated equality predicates when a subquery is aggregated

2021-04-15 Thread GitBox


allisonwang-db commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r614367593



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##
@@ -899,14 +899,71 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
 // +- SubqueryAlias t1, `t1`
 // +- Project [_1#73 AS c1#76, _2#74 AS c2#77]
 // +- LocalRelation [_1#73, _2#74]
-def failOnNonEqualCorrelatedPredicate(found: Boolean, p: LogicalPlan): 
Unit = {
-  if (found) {
+// SPARK-35080: The same issue can happen to correlated equality 
predicates when
+// they do not guarantee one-to-one mapping between inner and outer 
attributes.
+// For example:
+// Table:
+//   t1(a, b): [(0, 6), (1, 5), (2, 4)]
+//   t2(c): [(6)]
+//
+// Query:
+//   SELECT c, (SELECT COUNT(*) FROM t1 WHERE a + b = c) FROM t2
+//
+// Original subquery plan:
+//   Aggregate [count(1)]
+//   +- Filter ((a + b) = outer(c))
+//  +- LocalRelation [a, b]
+//
+// Plan after pulling up correlated predicates:
+//   Aggregate [a, b] [count(1), a, b]
+//   +- LocalRelation [a, b]
+//
+// Plan after rewrite:
+//   Project [c1, count(1)]
+//   +- Join LeftOuter ((a + b) = c)
+//  :- LocalRelation [c]
+//  +- Aggregate [a, b] [count(1), a, b]
+// +- LocalRelation [a, b]
+//
+// The right hand side of the join transformed from the subquery will 
output
+//   count(1) | a | b
+//  1 | 0 | 6
+//  1 | 1 | 5
+//  1 | 2 | 4
+// and the plan after rewrite will give the original query incorrect 
results.
+def failOnUnsupportedCorrelatedPredicate(predicates: Seq[Expression], p: 
LogicalPlan): Unit = {
+  if (predicates.nonEmpty) {
 // Report a non-supported case as an exception
-failAnalysis(s"Correlated column is not allowed in a non-equality 
predicate:\n$p")
+failAnalysis(s"Correlated column is not allowed in predicate " +
+  s"${predicates.map(_.sql).mkString}:\n$p")
   }
 }
 
-var foundNonEqualCorrelatedPred: Boolean = false
+def containsAttribute(e: Expression): Boolean = {
+  e.find(_.isInstanceOf[Attribute]).isDefined
+}
+
+// Given a correlated predicate, check if it is either a non-equality 
predicate or
+// equality predicate that does not guarantee one-on-one mapping between 
inner and
+// outer attributes. When the correlated predicate does not contain any 
attribute
+// (i.e. only has outer references), it is supported and should return 
false. E.G.:
+//   (a = outer(c)) -> false
+//   (outer(c) = outer(d)) -> false
+//   (a > outer(c)) -> true
+//   (a + b = outer(c)) -> true
+// The last one is true because there can be multiple combinations of (a, 
b) that
+// satisfy the equality condition. For example, if outer(c) = 0, then both 
(0, 0)
+// and (-1, 1) can make the predicate evaluate to true.
+def isUnsupportedPredicate(condition: Expression): Boolean = condition 
match {
+  // Only allow equality condition with one side being an attribute and 
another
+  // side being an expression without attributes from the inner query. Note
+  // OuterReference is a leaf node and will not be found here.
+  case Equality(_: Attribute, b) => containsAttribute(b)
+  case Equality(a, _: Attribute) => containsAttribute(a)
+  case o => containsAttribute(o)

Review comment:
   This is added to handle special cases like `outer(a) = outer(c)`, which 
should be supported. I'd like to reduce the risk of introducing new bugs since 
we need to backport this fix. Please let me know if you think this may cause 
any issue and I will remove this special case handling.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a change in pull request #32179: [SPARK-35080][SQL] Only allow a subset of correlated equality predicates when a subquery is aggregated

2021-04-15 Thread GitBox


allisonwang-db commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r614353737



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##
@@ -939,10 +939,16 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
   }
 }
 
+def containsAttribute(e: Expression): Boolean = {
+  e.find(_.isInstanceOf[Attribute]).isDefined
+}
+
 // Given a correlated predicate, check if it is either a non-equality 
predicate or
 // equality predicate that does not guarantee one-on-one mapping between 
inner and
-// outer attributes. E.G.:
+// outer attributes. When the correlated predicate does not contain any 
attribute
+// (i.e. only has outer references), it is supported and should return 
false. E.G.:
 //   (a = outer(c)) -> false
+//   (outer(c) = outer(d)) -> false

Review comment:
   Yes added a test case in SubquerySuite




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] allisonwang-db commented on a change in pull request #32179: [SPARK-35080][SQL] Only allow a subset of correlated equality predicates when a subquery is aggregated

2021-04-14 Thread GitBox


allisonwang-db commented on a change in pull request #32179:
URL: https://github.com/apache/spark/pull/32179#discussion_r613741888



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##
@@ -950,9 +950,15 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
   case f: Filter =>
 val (correlated, _) = 
splitConjunctivePredicates(f.condition).partition(containsOuter)
 
-// Find any non-equality correlated predicates
+// Find any non-equality correlated predicates and equality predicates 
that do not
+// guarantee one-on-one mapping between inner and outer attributes. 
E.G:
+// a = outer(c) -> true
+// a > outer(c) -> false
+// a + b = outer(c) -> false (because there can be multiple 
combinations of a, b that
+// satisfy the condition)
 foundNonEqualCorrelatedPred = foundNonEqualCorrelatedPred || 
correlated.exists {
-  case _: EqualTo | _: EqualNullSafe => false
+  case Equality(_: Attribute, b) => 
b.find(_.isInstanceOf[Attribute]).isDefined

Review comment:
   Will do!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org