Github user gengliangwang commented on a diff in the pull request: https://github.com/apache/spark/pull/20069#discussion_r158697347 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -875,13 +875,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { } case filter @ Filter(condition, watermark: EventTimeWatermark) => - // We can only push deterministic predicates which don't reference the watermark attribute. - // We could in theory span() only on determinism and pull out deterministic predicates - // on the watermark separately. But it seems unnecessary and a bit confusing to not simply - // use the prefix as we do for nondeterminism in other cases. - - val (pushDown, stayUp) = splitConjunctivePredicates(condition).span( - p => p.deterministic && !p.references.contains(watermark.eventTime)) + val (pushDown, stayUp) = { + val pushDownCondition: Expression => Boolean = + p => p.deterministic && !p.references.contains(watermark.eventTime) + if (SQLConf.get.outOfOrderPredicateEvaluationEnabled) { --- End diff -- Nit: Create a function which accepts parameter `Expression => Boolean` and `predicates: Seq[Expression]`, and `partitionByDeterminism` is using the function parameter as `e => e. deterministic`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org