vadimpiven commented on code in PR #18848:
URL: https://github.com/apache/datafusion/pull/18848#discussion_r2547718913


##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -418,6 +418,204 @@ fn extract_or_clause(expr: &Expr, schema_columns: 
&HashSet<Column>) -> Option<Ex
     predicate
 }
 
+/// Tracks coalesce predicates that can be pushed to each side of a FULL JOIN.
+struct PushDownCoalesceFilterHelper {
+    join_keys: Vec<(Column, Column)>,
+    left_filters: Vec<Expr>,
+    right_filters: Vec<Expr>,
+    remaining_filters: Vec<Expr>,
+}
+
+impl PushDownCoalesceFilterHelper {
+    fn new(join_keys: &[(Expr, Expr)]) -> Self {
+        let join_keys = join_keys
+            .iter()
+            .filter_map(|(lhs, rhs)| {
+                Some((lhs.try_as_col()?.clone(), rhs.try_as_col()?.clone()))
+            })
+            .collect();
+        Self {
+            join_keys,
+            left_filters: Vec::new(),
+            right_filters: Vec::new(),
+            remaining_filters: Vec::new(),
+        }
+    }
+
+    fn push_columns<F: FnMut(Expr) -> Expr>(
+        &mut self,
+        columns: (Column, Column),
+        mut build_filter: F,
+    ) {
+        self.left_filters
+            .push(build_filter(Expr::Column(columns.0)));
+        self.right_filters
+            .push(build_filter(Expr::Column(columns.1)));
+    }
+
+    fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> {
+        if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr {
+            if func.name() != "coalesce" {
+                return None;
+            }
+            if let [Expr::Column(lhs), Expr::Column(rhs)] = args.as_slice() {
+                for (join_lhs, join_rhs) in &self.join_keys {
+                    if join_lhs == lhs && join_rhs == rhs {
+                        return Some((lhs.clone(), rhs.clone()));
+                    }
+                    if join_lhs == rhs && join_rhs == lhs {
+                        return Some((rhs.clone(), lhs.clone()));
+                    }
+                }
+            }
+        }
+        None
+    }
+
+    fn push_term(&mut self, term: &Expr) {
+        match term {
+            Expr::BinaryExpr(BinaryExpr { left, op, right })
+                if op.supports_propagation() =>
+            {
+                if let Some(columns) = self.extract_join_columns(left) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::BinaryExpr(BinaryExpr {
+                            left: Box::new(replacement),
+                            op: *op,
+                            right: right.clone(),
+                        })
+                    });
+                }
+                if let Some(columns) = self.extract_join_columns(right) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::BinaryExpr(BinaryExpr {
+                            left: left.clone(),
+                            op: *op,
+                            right: Box::new(replacement),
+                        })
+                    });
+                }
+            }
+            Expr::IsNull(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNull(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotNull(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotNull(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsTrue(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsTrue(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsFalse(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsFalse(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsUnknown(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsUnknown(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotTrue(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotTrue(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotFalse(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotFalse(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::IsNotUnknown(expr) => {
+                if let Some(columns) = self.extract_join_columns(expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::IsNotUnknown(Box::new(replacement))
+                    });
+                }
+            }
+            Expr::Between(between) => {
+                if let Some(columns) = 
self.extract_join_columns(&between.expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::Between(Between {
+                            expr: Box::new(replacement),
+                            negated: between.negated,
+                            low: between.low.clone(),
+                            high: between.high.clone(),
+                        })
+                    });
+                }
+            }
+            Expr::InList(in_list) => {
+                if let Some(columns) = 
self.extract_join_columns(&in_list.expr) {
+                    return self.push_columns(columns, |replacement| {
+                        Expr::InList(InList {
+                            expr: Box::new(replacement),
+                            list: in_list.list.clone(),
+                            negated: in_list.negated,
+                        })
+                    });
+                }
+            }
+            _ => {}
+        }
+        self.remaining_filters.push(term.clone());
+    }
+
+    fn push_predicate(
+        mut self,
+        predicate: Expr,
+    ) -> Result<(Option<Expr>, Option<Expr>, Vec<Expr>)> {
+        let predicates = split_conjunction_owned(predicate);
+        let terms = simplify_predicates(predicates)?;
+        for term in terms {
+            self.push_term(&term);
+        }
+        Ok((
+            conjunction(self.left_filters),
+            conjunction(self.right_filters),
+            self.remaining_filters,
+        ))
+    }
+}
+
+fn push_full_join_coalesce_filters(

Review Comment:
   Sorry, forgot to change the name. I am using this optimization in my code 
specifically for chains (up to 50-table long) of FULL OUTER JOINs. I am making 
joins with a sequence join->project with coalesce over join keys -> alias, like:
   
   ```rust
   let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
       .join(
           LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
           JoinType::Full,
           (vec!["a"], vec!["a"]),
           None,
       )?
       .project(vec![
           coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
           col("t1.b").alias("b1"),
           col("t2.b").alias("b2"),
       ])?
       .alias("j1")?
       .build()?;
   ```
   
   This way the initial data which looks like
   
   ```json
   {
     "table1": {
       "1": 100,
       "2": 200,
       "3": 300
     },
     "table2": {
       "2": 2000,
       "3": 3000,
       "4": 4000
     },
     "table3": {
       "3": 30000,
       "4": 40000,
       "5": 50000
   }
   ```
   
   is joined into
   
   | key | table1 | table2 | table3 |
   |-----|-------|-------|--------|
   | 1      | 100     | null     | null      |
   | 2      | 200    | 2000  | null      |
   | 3      | 300    | 3000  | 30000 |
   | 4      | null     | 4000  | 40000 |
   | 5      | null     | null     | 50000 |
   
   instead of 
   
   | key1 | key2 | key3 | table1 | table2 | table3 |
   |----- |----- |----- |-------|-------|--------|
   | 1       | null  | null   | 100     | null     | null      |
   | 2       | 2      | null  | 200    | 2000  | null      |
   | 3       | 3      | 3      | 300    | 3000  | 30000 |
   | null   | 4      | 4      | null     | 4000  | 40000 |
   | null   | null  | 5      | null     | null     | 50000 |
   
   You can check the illustration 
https://docs.platforma.bio/guides/vdj-analysis/diversity-analysis/#results-table
 where different sample properties are joined by Sample Id from different 
parquet files.
   
   When I apply filter on Key what I effectively want is to replicate this 
filter to all input tables. And optimization that I provided does exactly that.
   
   I am applying the chain **join->project with coalesce over join keys -> 
alias** for each new table, so for 50 tables I would have 49 projections with 
coalesce. Without my optimization, each optimizer pass has simplification which 
turns `coalesce` into `CASE` and then performs push-down which again turns 
`case` to `coalesce`. So 1 optimizer pass gives me propagation through 1 layer, 
and for 50 tables I would have to have 49 optimizer passes for full 
propagation. The optimization in this PR allows to optimize such scenario in 1 
optimizer pass.
   
   I realized that this optimization seems correct for any type of join if 
coalesce is applied to the join keys, so I do not have explicit check for FULL 
OUTER JOIN in proposed code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to