jackwener commented on code in PR #4365:
URL: https://github.com/apache/arrow-datafusion/pull/4365#discussion_r1035650796


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -500,302 +387,359 @@ fn optimize_join(
     //      vector will contain only join keys (without additional
     //      element representing filter).
     let expr = plan.expressions();
-    let expr = if !on_filter.is_empty() && on_to_keep.is_empty() {
+    let expr = if !on_filter_empty && keep_condition.is_empty() {
         // New filter expression is None - should remove last element
         expr[..expr.len() - 1].to_vec()
-    } else if !on_to_keep.is_empty() {
+    } else if !keep_condition.is_empty() {
         // Replace last element with new filter expression
         expr[..expr.len() - 1]
             .iter()
             .cloned()
-            .chain(once(on_to_keep.into_iter().reduce(Expr::and).unwrap()))
+            .chain(once(keep_condition.into_iter().reduce(Expr::and).unwrap()))
             .collect()
     } else {
         plan.expressions()
     };
     let plan = from_plan(plan, &expr, &[left, right])?;
 
-    if to_keep.0.is_empty() {
+    if keep_predicates.is_empty() {
         Ok(plan)
     } else {
         // wrap the join on the filter whose predicates must be kept
-        let plan = utils::add_filter(plan, &to_keep.0)?;
-        state.filters = remove_filters(&state.filters, &to_keep.1);
-
-        Ok(plan)
+        match conjunction(keep_predicates) {
+            Some(predicate) => Ok(LogicalPlan::Filter(Filter::try_new(
+                predicate,
+                Arc::new(plan),
+            )?)),
+            None => Ok(plan),
+        }
     }
 }
 
-fn optimize(plan: &LogicalPlan, mut state: State) -> Result<LogicalPlan> {
-    match plan {
-        LogicalPlan::Explain { .. } => {
-            // push the optimization to the plan of this explain
-            push_down(&state, plan)
-        }
-        LogicalPlan::Analyze { .. } => push_down(&state, plan),
-        LogicalPlan::Filter(filter) => {
-            let predicate = utils::cnf_rewrite(filter.predicate().clone());
-
-            utils::split_conjunction_owned(predicate)
-                .into_iter()
-                .try_for_each::<_, Result<()>>(|predicate| {
-                    let columns = predicate.to_columns()?;
-                    state.filters.push((predicate, columns));
-                    Ok(())
-                })?;
-
-            optimize(filter.input(), state)
+fn push_down_join(
+    plan: &LogicalPlan,
+    join: &Join,
+    parent_predicate: Option<&Expr>,
+) -> Result<Option<LogicalPlan>> {
+    let mut predicates = match parent_predicate {
+        Some(parent_predicate) => {
+            
utils::split_conjunction_owned(utils::cnf_rewrite(parent_predicate.clone()))
         }
-        LogicalPlan::Projection(Projection {
-            input,
-            expr,
-            schema,
-        }) => {
-            // A projection is filter-commutable, but re-writes all predicate 
expressions
-            // collect projection.
-            let projection = schema
-                .fields()
-                .iter()
-                .enumerate()
-                .flat_map(|(i, field)| {
-                    // strip alias, as they should not be part of filters
-                    let expr = match &expr[i] {
-                        Expr::Alias(expr, _) => expr.as_ref().clone(),
-                        expr => expr.clone(),
+        None => vec![],
+    };
+
+    // Convert JOIN ON predicate to Predicates
+    let on_filters = join
+        .filter
+        .as_ref()
+        .map(|e| utils::split_conjunction_owned(e.clone()))
+        .unwrap_or_else(Vec::new);
+
+    if join.join_type == JoinType::Inner {
+        // For inner joins, duplicate filters for joined columns so filters 
can be pushed down
+        // to both sides. Take the following query as an example:
+        //
+        // ```sql
+        // SELECT * FROM t1 JOIN t2 on t1.id = t2.uid WHERE t1.id > 1
+        // ```
+        //
+        // `t1.id > 1` predicate needs to be pushed down to t1 table scan, 
while
+        // `t2.uid > 1` predicate needs to be pushed down to t2 table scan.
+        //
+        // Join clauses with `Using` constraints also take advantage of this 
logic to make sure
+        // predicates reference the shared join columns are pushed to both 
sides.
+        // This logic should also been applied to conditions in JOIN ON clause
+        let join_side_filters = predicates
+            .iter()
+            .chain(on_filters.iter())
+            .filter_map(|predicate| {
+                let mut join_cols_to_replace = HashMap::new();
+                let columns = match predicate.to_columns() {
+                    Ok(columns) => columns,
+                    Err(e) => return Some(Err(e)),
+                };
+
+                for col in columns.iter() {
+                    for (l, r) in join.on.iter() {
+                        if col == l {
+                            join_cols_to_replace.insert(col, r);
+                            break;
+                        } else if col == r {
+                            join_cols_to_replace.insert(col, l);
+                            break;
+                        }
+                    }
+                }

Review Comment:
   > There is `break` to return the inner loop. I think for join conditions: 
`on(a.id = b.id and a.id = b.id2) where b.id = 10`, we should be able to infer 
more equality predicates.
   
   It should be common optimization, `infer conditon`, Many rule about `join` 
need to it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to