sigmod commented on code in PR #36080:
URL: https://github.com/apache/spark/pull/36080#discussion_r844821659


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -132,28 +131,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
-  private def canFilterLeft(joinType: JoinType): Boolean = joinType match {
-    case Inner | RightOuter => true
-    case _ => false
-  }
-
-  private def canFilterRight(joinType: JoinType): Boolean = joinType match {
-    case Inner | LeftOuter => true
-    case _ => false
-  }
-
   private def isProbablyShuffleJoin(left: LogicalPlan,
       right: LogicalPlan, hint: JoinHint): Boolean = {
     !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) &&
       !canBroadcastBySize(left, conf) && !canBroadcastBySize(right, conf)
   }
 
-  private def probablyHasShuffle(plan: LogicalPlan): Boolean = {
-    plan.collectFirst {
-      case j@Join(left, right, _, _, hint)
-        if isProbablyShuffleJoin(left, right, hint) => j
-      case a: Aggregate => a
-    }.nonEmpty
+  // Make sure injected filters could push through Shuffle, see 
PushPredicateThroughNonJoin
+  private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): 
Boolean = {
+    if (exp.references.isEmpty) return false

Review Comment:
   Just curious:  are you saying we can inject a Bloom filter as long as it can 
be pushed through a shuffle, but not necessarily to immediately after the scan?
   
   Alternatively, I was thinking that `findExpressionAndTrackLineageDown` for 
`Window`, and add a Window case in the original `probablyHasShuffle`? 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/InjectRuntimeFilter.scala:
##########
@@ -132,28 +131,38 @@ object InjectRuntimeFilter extends Rule[LogicalPlan] with 
PredicateHelper with J
       REGEXP_EXTRACT_FAMILY, REGEXP_REPLACE)
   }
 
-  private def canFilterLeft(joinType: JoinType): Boolean = joinType match {
-    case Inner | RightOuter => true
-    case _ => false
-  }
-
-  private def canFilterRight(joinType: JoinType): Boolean = joinType match {
-    case Inner | LeftOuter => true
-    case _ => false
-  }
-
   private def isProbablyShuffleJoin(left: LogicalPlan,
       right: LogicalPlan, hint: JoinHint): Boolean = {
     !hintToBroadcastLeft(hint) && !hintToBroadcastRight(hint) &&
       !canBroadcastBySize(left, conf) && !canBroadcastBySize(right, conf)
   }
 
-  private def probablyHasShuffle(plan: LogicalPlan): Boolean = {
-    plan.collectFirst {
-      case j@Join(left, right, _, _, hint)
-        if isProbablyShuffleJoin(left, right, hint) => j
-      case a: Aggregate => a
-    }.nonEmpty
+  // Make sure injected filters could push through Shuffle, see 
PushPredicateThroughNonJoin
+  private def probablyPushThroughShuffle(exp: Expression, plan: LogicalPlan): 
Boolean = {
+    if (exp.references.isEmpty) return false
+
+    plan match {
+      case Join(left, right, _, _, hint) if isProbablyShuffleJoin(left, right, 
hint) &&
+        (exp.references.subsetOf(left.outputSet) || 
exp.references.subsetOf(right.outputSet)) =>
+        true
+      case a: Aggregate if a.aggregateExpressions.forall(_.deterministic) &&

Review Comment:
   what's the difference with the next Aggregate case branch?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to