sigmod commented on code in PR #36080: URL: https://github.com/apache/spark/pull/36080#discussion_r844821659
########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ########## @@ -132,28 +131,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE) } - private def canFilterLeft(joinType: JoinType): Boolean = joinType match { - case Inner | RightOuter => true - case _ => false - } - - private def canFilterRight(joinType: JoinType): Boolean = joinType match { - case Inner | LeftOuter => true - case _ => false - } - private def isProbablyShuffleJoin(left: LogicalPlan, right: LogicalPlan, hint: JoinHint): Boolean = { !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) && !canBroadcastBySize(left, conf) && !canBroadcastBySize(right, conf) } - private def probablyHasShuffle(plan: LogicalPlan): Boolean = { - plan.collectFirst { - case j@Join(left, right, _, _, hint) - if isProbablyShuffleJoin(left, right, hint) => j - case a: Aggregate => a - }.nonEmpty + // Make sure injected filters could push through Shuffle, see PushPredicateThroughNonJoin + private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): Boolean = { + if (exp.references.isEmpty) return false Review Comment: Just curious: are you saying we can inject a Bloom filter as long as it can be pushed through a shuffle, but not necessarily to immediately after the scan? Alternatively, I was thinking that `findExpressionAndTrackLineageDown` for `Window`, and add a Window case in the original `probablyHasShuffle`? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala: ########## @@ -132,28 +131,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with PredicateHelper with J REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE) } - private def canFilterLeft(joinType: JoinType): Boolean = joinType match { - case Inner | RightOuter => true - case _ => false - } - - private def canFilterRight(joinType: JoinType): Boolean = joinType match { - case Inner | LeftOuter => true - case _ => false - } - private def isProbablyShuffleJoin(left: LogicalPlan, right: LogicalPlan, hint: JoinHint): Boolean = { !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) && !canBroadcastBySize(left, conf) && !canBroadcastBySize(right, conf) } - private def probablyHasShuffle(plan: LogicalPlan): Boolean = { - plan.collectFirst { - case j@Join(left, right, _, _, hint) - if isProbablyShuffleJoin(left, right, hint) => j - case a: Aggregate => a - }.nonEmpty + // Make sure injected filters could push through Shuffle, see PushPredicateThroughNonJoin + private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): Boolean = { + if (exp.references.isEmpty) return false + + plan match { + case Join(left, right, _, _, hint) if isProbablyShuffleJoin(left, right, hint) && + (exp.references.subsetOf(left.outputSet) || exp.references.subsetOf(right.outputSet)) => + true + case a: Aggregate if a.aggregateExpressions.forall(_.deterministic) && Review Comment: what's the difference with the next Aggregate case branch? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org