HyukjinKwon commented on a change in pull request #33232:
URL: https://github.com/apache/spark/pull/33232#discussion_r665029120



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -1442,6 +1442,12 @@ object PushPredicateThroughNonJoin extends 
Rule[LogicalPlan] with PredicateHelpe
       pushDownPredicate(filter, u.child) { predicate =>
         u.withNewChildren(Seq(Filter(predicate, u.child)))
       }
+
+    // Push down filter predicates in case filter having child as TypedFilter.
+    // In this scenario inorder to push the filter predicates there is need to
+    // to push Filter beneath the TypedFilter.
+    case Filter(condition, typeFilter @ TypedFilter(_, _, _, _, _)) =>
+      typeFilter.copy(child = Filter(condition, typeFilter.child))

Review comment:
       The only thing makes me worried is `TypedFilter` takes an arbitrary 
function that can depend on, e.g, number of records being filtered or any sort 
of global variable for the task. Once the order is switched, the logic can be 
broken. e.g.)
   
   ```scala
   object RecordFilteredPerTask {
       var count = 0
   }
   
   df.filter({ v =>
     RecordFilteredPerTask.count += 1
     RecordFilteredPerTask.count > 5
   })
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to