allisonwang-db commented on a change in pull request #33070: URL: https://github.com/apache/spark/pull/33070#discussion_r668428086
########## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala ########## @@ -428,7 +451,132 @@ object DecorrelateInnerQuery extends PredicateHelper { groupingExpressions = newGroupingExpr ++ referencesToAdd, aggregateExpressions = newAggExpr ++ referencesToAdd, child = newChild) - (newAggregate, joinCond, outerReferenceMap) + + // Preserving domain attributes over an Aggregate with an empty grouping expression + // is subject to the "COUNT bug" that can lead to wrong answer: + // + // Suppose the original query is: + // SELECT a, (SELECT COUNT(*) cnt FROM t2 WHERE t1.a = t2.c) FROM t1 + // + // Decorrelated plan: + // Project [a, scalar-subquery [a = c]] + // : +- Aggregate [c] [count(*) AS cnt, c] + // : +- Relation [c, d] + // +- Relation [a, b] + // + // After rewrite: + // Project [a, cnt] + // +- Join LeftOuter (a = c) + // :- Relation [a, b] + // +- Aggregate [c] [count(*) AS cnt, c] + // +- Relation [c, d] + // + // T1 T2 T2' (GROUP BY c) + // +---+---+ +---+---+ +---+-----+ + // | a | b | | c | d | | c | cnt | + // +---+---+ +---+---+ +---+-----+ + // | 0 | 1 | | 0 | 2 | | 0 | 2 | + // | 1 | 2 | | 0 | 3 | +---+-----+ + // +---+---+ +---+---+ + // + // T1 nested loop join T2 T1 left outer join T2' + // on (a = c): on (a = c): + // +---+-----+ +---+-----++ + // | a | cnt | | a | cnt | + // +---+-----+ +---+------+ + // | 0 | 2 | | 0 | 2 | + // | 1 | 0 | <--- correct | 1 | null | <--- wrong result + // +---+-----+ +---+------+ + // + // If an aggregate is subject to the COUNT bug: + // 1) add a column `true AS alwaysTrue` to the result of the aggregate + // 2) insert a left outer domain join between the outer query and this aggregate + // 3) rewrite the original aggregate's output column using the default value of the + // aggregate function and the alwaysTrue column. + // + // For example, T1 left outer join T2' with `alwaysTrue` marker: + // +---+------+------------+--------------------------------+ + // | c | cnt | alwaysTrue | if(isnull(alwaysTrue), 0, cnt) | + // +---+------+------------+--------------------------------+ + // | 0 | 2 | true | 2 | + // | 0 | null | null | 0 | <--- correct result + // +---+------+------------+--------------------------------+ + if (groupingExpressions.isEmpty && handleCountBug) { + // Evaluate the aggregate expressions with zero tuples. + val resultMap = RewriteCorrelatedScalarSubquery.evalAggregateOnZeroTups(newAggregate) + val alwaysTrue = Alias(Literal.TrueLiteral, "alwaysTrue")() + val alwaysTrueRef = alwaysTrue.toAttribute.withNullability(true) + val expressions = ArrayBuffer.empty[NamedExpression] + // Create new aliases for aggregate expressions that have non-null default + // values and reconstruct the output with the `alwaysTrue` marker. + val projectList = newAggregate.aggregateExpressions.map { a => + resultMap.get(a.exprId) match { + // Aggregate expression is not subject to the count bug. + case Some(Literal(null, _)) | None => + expressions += a + // The attribute is nullable since it is from the right-hand side of a + // left outer join. + a.toAttribute.withNullability(true) + case Some(default) => + val newAttr = a.newInstance() Review comment: Yes this holds because if `a` is an attribute, evaluating it with zero tuples will yield null: https://github.com/apache/spark/blob/c46342e3d057bcc949b4caf016514ff05e0a1ebd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala#L407 Another possibility is OuterReference. Let me verify if outer works in this case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org