mingmwang commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1034284043


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -500,302 +387,344 @@ fn optimize_join(
     //      vector will contain only join keys (without additional
     //      element representing filter).
     let expr = plan.expressions();
-    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
+    let expr = if !on_filter_empty && keep_condition.is_empty() {
         // New filter expression is None - should remove last element
         expr[..expr.len() - 1].to_vec()
-    } else if !on_to_keep.is_empty() {
+    } else if !keep_condition.is_empty() {
         // Replace last element with new filter expression
         expr[..expr.len() - 1]
             .iter()
             .cloned()
-            .chain(once(on_to_keep.into_iter().reduce(Expr::and).unwrap()))
+            .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap()))
             .collect()
     } else {
         plan.expressions()
     };
     let plan = from_plan(plan, &expr, &[left, right])?;
 
-    if to_keep.0.is_empty() {
+    if keep_predicates.is_empty() {
         Ok(plan)
     } else {
         // wrap the join on the filter whose predicates must be kept
-        let plan = utils::add_filter(plan, &to_keep.0)?;
-        state.filters = remove_filters(&state.filters, &to_keep.1);
-
-        Ok(plan)
+        match conjunction(keep_predicates) {
+            Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
+                predicate,
+                Arc::new(plan),
+            )?)),
+            None => Ok(plan),
+        }
     }
 }
 
-fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Explain { .. } => {
-            // push the optimization to the plan of this explain
-            push_down(&state, plan)
-        }
-        LogicalPlan::Analyze { .. } => push_down(&state, plan),
-        LogicalPlan::Filter(filter) => {
-            let predicate = utils::cnf_rewrite(filter.predicate().clone());
-
-            utils::split_conjunction_owned(predicate)
-                .into_iter()
-                .try_for_each::<_, Result<()>>(|predicate| {
-                    let columns = predicate.to_columns()?;
-                    state.filters.push((predicate, columns));
-                    Ok(())
-                })?;
-
-            optimize(filter.input(), state)
+fn push_down_join(
+    plan: &LogicalPlan,
+    join: &Join,
+    parent_predicate: Option<&Expr>,
+) -> Result<Option<LogicalPlan>> {
+    let mut predicates = match parent_predicate {
+        Some(parent_predicate) => {
+            
utils::split_conjunction_owned(utils::cnf_rewrite(parent_predicate.clone()))
         }
-        LogicalPlan::Projection(Projection {
-            input,
-            expr,
-            schema,
-        }) => {
-            // A projection is filter-commutable, but re-writes all predicate 
expressions
-            // collect projection.
-            let projection = schema
-                .fields()
-                .iter()
-                .enumerate()
-                .flat_map(|(i, field)| {
-                    // strip alias, as they should not be part of filters
-                    let expr = match &expr[i] {
-                        Expr::Alias(expr, _) => expr.as_ref().clone(),
-                        expr => expr.clone(),
+        None => vec![],
+    };
+
+    // Convert JOIN ON predicate to Predicates
+    let on_filters = join
+        .filter
+        .as_ref()
+        .map(|e| utils::split_conjunction_owned(e.clone()))
+        .unwrap_or_else(Vec::new);
+
+    if join.join_type == JoinType::Inner {
+        // For inner joins, duplicate filters for joined columns so filters 
can be pushed down

Review Comment:
   And for Out joins:
   For Left Out join, if we have a query `SELECT t1_id, t1_name, t2_name FROM 
t1 LEFT JOIN t2 ON (t1_id = t2_id and t2_id >= 100);` ` t2_id >= 100 ` can be 
pushed down to the right side.
   
   For Right Out join, if we have a query `SELECT t1_id, t1_name, t2_name FROM 
t1 Right JOIN t2 ON (t1_id = t2_id and t1_id >= 100);` ` t1_id >= 100 ` can be 
pushed down to the left side.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to