mingmwang commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1034277882


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -500,302 +387,344 @@ fn optimize_join(
     //      vector will contain only join keys (without additional
     //      element representing filter).
     let expr = plan.expressions();
-    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
+    let expr = if !on_filter_empty && keep_condition.is_empty() {
         // New filter expression is None - should remove last element
         expr[..expr.len() - 1].to_vec()
-    } else if !on_to_keep.is_empty() {
+    } else if !keep_condition.is_empty() {
         // Replace last element with new filter expression
         expr[..expr.len() - 1]
             .iter()
             .cloned()
-            .chain(once(on_to_keep.into_iter().reduce(Expr::and).unwrap()))
+            .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap()))
             .collect()
     } else {
         plan.expressions()
     };
     let plan = from_plan(plan, &expr, &[left, right])?;
 
-    if to_keep.0.is_empty() {
+    if keep_predicates.is_empty() {
         Ok(plan)
     } else {
         // wrap the join on the filter whose predicates must be kept
-        let plan = utils::add_filter(plan, &to_keep.0)?;
-        state.filters = remove_filters(&state.filters, &to_keep.1);
-
-        Ok(plan)
+        match conjunction(keep_predicates) {
+            Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
+                predicate,
+                Arc::new(plan),
+            )?)),
+            None => Ok(plan),
+        }
     }
 }
 
-fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Explain { .. } => {
-            // push the optimization to the plan of this explain
-            push_down(&state, plan)
-        }
-        LogicalPlan::Analyze { .. } => push_down(&state, plan),
-        LogicalPlan::Filter(filter) => {
-            let predicate = utils::cnf_rewrite(filter.predicate().clone());
-
-            utils::split_conjunction_owned(predicate)
-                .into_iter()
-                .try_for_each::<_, Result<()>>(|predicate| {
-                    let columns = predicate.to_columns()?;
-                    state.filters.push((predicate, columns));
-                    Ok(())
-                })?;
-
-            optimize(filter.input(), state)
+fn push_down_join(
+    plan: &LogicalPlan,
+    join: &Join,
+    parent_predicate: Option<&Expr>,
+) -> Result<Option<LogicalPlan>> {
+    let mut predicates = match parent_predicate {
+        Some(parent_predicate) => {
+            
utils::split_conjunction_owned(utils::cnf_rewrite(parent_predicate.clone()))
         }
-        LogicalPlan::Projection(Projection {
-            input,
-            expr,
-            schema,
-        }) => {
-            // A projection is filter-commutable, but re-writes all predicate 
expressions
-            // collect projection.
-            let projection = schema
-                .fields()
-                .iter()
-                .enumerate()
-                .flat_map(|(i, field)| {
-                    // strip alias, as they should not be part of filters
-                    let expr = match &expr[i] {
-                        Expr::Alias(expr, _) => expr.as_ref().clone(),
-                        expr => expr.clone(),
+        None => vec![],
+    };
+
+    // Convert JOIN ON predicate to Predicates
+    let on_filters = join
+        .filter
+        .as_ref()
+        .map(|e| utils::split_conjunction_owned(e.clone()))
+        .unwrap_or_else(Vec::new);
+
+    if join.join_type == JoinType::Inner {
+        // For inner joins, duplicate filters for joined columns so filters 
can be pushed down

Review Comment:
   Does the same logic can apply to LeftSemi and RightSemi join ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to