allisonwang-db commented on a change in pull request #33070:
URL: https://github.com/apache/spark/pull/33070#discussion_r668428086



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
##########
@@ -428,7 +451,132 @@ object DecorrelateInnerQuery extends PredicateHelper {
               groupingExpressions = newGroupingExpr ++ referencesToAdd,
               aggregateExpressions = newAggExpr ++ referencesToAdd,
               child = newChild)
-            (newAggregate, joinCond, outerReferenceMap)
+
+            // Preserving domain attributes over an Aggregate with an empty 
grouping expression
+            // is subject to the "COUNT bug" that can lead to wrong answer:
+            //
+            // Suppose the original query is:
+            //   SELECT a, (SELECT COUNT(*) cnt FROM t2 WHERE t1.a = t2.c) 
FROM t1
+            //
+            // Decorrelated plan:
+            //   Project [a, scalar-subquery [a = c]]
+            //   :  +- Aggregate [c] [count(*) AS cnt, c]
+            //   :     +- Relation [c, d]
+            //   +- Relation [a, b]
+            //
+            // After rewrite:
+            //   Project [a, cnt]
+            //   +- Join LeftOuter (a = c)
+            //      :- Relation [a, b]
+            //      +- Aggregate [c] [count(*) AS cnt, c]
+            //         +- Relation [c, d]
+            //
+            //     T1            T2          T2' (GROUP BY c)
+            // +---+---+     +---+---+     +---+-----+
+            // | a | b |     | c | d |     | c | cnt |
+            // +---+---+     +---+---+     +---+-----+
+            // | 0 | 1 |     | 0 | 2 |     | 0 | 2   |
+            // | 1 | 2 |     | 0 | 3 |     +---+-----+
+            // +---+---+     +---+---+
+            //
+            // T1 nested loop join T2     T1 left outer join T2'
+            // on (a = c):                on (a = c):
+            // +---+-----+                +---+-----++
+            // | a | cnt |                | a | cnt  |
+            // +---+-----+                +---+------+
+            // | 0 | 2   |                | 0 | 2    |
+            // | 1 | 0   | <--- correct   | 1 | null | <--- wrong result
+            // +---+-----+                +---+------+
+            //
+            // If an aggregate is subject to the COUNT bug:
+            // 1) add a column `true AS alwaysTrue` to the result of the 
aggregate
+            // 2) insert a left outer domain join between the outer query and 
this aggregate
+            // 3) rewrite the original aggregate's output column using the 
default value of the
+            //    aggregate function and the alwaysTrue column.
+            //
+            // For example, T1 left outer join T2' with `alwaysTrue` marker:
+            // +---+------+------------+--------------------------------+
+            // | c | cnt  | alwaysTrue | if(isnull(alwaysTrue), 0, cnt) |
+            // +---+------+------------+--------------------------------+
+            // | 0 | 2    | true       | 2                              |
+            // | 0 | null | null       | 0                              |  
<--- correct result
+            // +---+------+------------+--------------------------------+
+            if (groupingExpressions.isEmpty && handleCountBug) {
+              // Evaluate the aggregate expressions with zero tuples.
+              val resultMap = 
RewriteCorrelatedScalarSubquery.evalAggregateOnZeroTups(newAggregate)
+              val alwaysTrue = Alias(Literal.TrueLiteral, "alwaysTrue")()
+              val alwaysTrueRef = alwaysTrue.toAttribute.withNullability(true)
+              val expressions = ArrayBuffer.empty[NamedExpression]
+              // Create new aliases for aggregate expressions that have 
non-null default
+              // values and reconstruct the output with the `alwaysTrue` 
marker.
+              val projectList = newAggregate.aggregateExpressions.map { a =>
+                resultMap.get(a.exprId) match {
+                  // Aggregate expression is not subject to the count bug.
+                  case Some(Literal(null, _)) | None =>
+                    expressions += a
+                    // The attribute is nullable since it is from the 
right-hand side of a
+                    // left outer join.
+                    a.toAttribute.withNullability(true)
+                  case Some(default) =>
+                    val newAttr = a.newInstance()

Review comment:
       Yes this holds because if `a` is an attribute, evaluating it with zero 
tuples will yield null:
   
https://github.com/apache/spark/blob/c46342e3d057bcc949b4caf016514ff05e0a1ebd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala#L407
   Another possibility is OuterReference. Let me verify if outer works in this 
case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to