vadimpiven commented on code in PR #18848:
URL: https://github.com/apache/datafusion/pull/18848#discussion_r2547718913
##########
datafusion/optimizer/src/push_down_filter.rs:
##########
@@ -418,6 +418,204 @@ fn extract_or_clause(expr: &Expr, schema_columns:
&HashSet<Column>) -> Option<Ex
predicate
}
+/// Tracks coalesce predicates that can be pushed to each side of a FULL JOIN.
+struct PushDownCoalesceFilterHelper {
+ join_keys: Vec<(Column, Column)>,
+ left_filters: Vec<Expr>,
+ right_filters: Vec<Expr>,
+ remaining_filters: Vec<Expr>,
+}
+
+impl PushDownCoalesceFilterHelper {
+ fn new(join_keys: &[(Expr, Expr)]) -> Self {
+ let join_keys = join_keys
+ .iter()
+ .filter_map(|(lhs, rhs)| {
+ Some((lhs.try_as_col()?.clone(), rhs.try_as_col()?.clone()))
+ })
+ .collect();
+ Self {
+ join_keys,
+ left_filters: Vec::new(),
+ right_filters: Vec::new(),
+ remaining_filters: Vec::new(),
+ }
+ }
+
+ fn push_columns<F: FnMut(Expr) -> Expr>(
+ &mut self,
+ columns: (Column, Column),
+ mut build_filter: F,
+ ) {
+ self.left_filters
+ .push(build_filter(Expr::Column(columns.0)));
+ self.right_filters
+ .push(build_filter(Expr::Column(columns.1)));
+ }
+
+ fn extract_join_columns(&self, expr: &Expr) -> Option<(Column, Column)> {
+ if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr {
+ if func.name() != "coalesce" {
+ return None;
+ }
+ if let [Expr::Column(lhs), Expr::Column(rhs)] = args.as_slice() {
+ for (join_lhs, join_rhs) in &self.join_keys {
+ if join_lhs == lhs && join_rhs == rhs {
+ return Some((lhs.clone(), rhs.clone()));
+ }
+ if join_lhs == rhs && join_rhs == lhs {
+ return Some((rhs.clone(), lhs.clone()));
+ }
+ }
+ }
+ }
+ None
+ }
+
+ fn push_term(&mut self, term: &Expr) {
+ match term {
+ Expr::BinaryExpr(BinaryExpr { left, op, right })
+ if op.supports_propagation() =>
+ {
+ if let Some(columns) = self.extract_join_columns(left) {
+ return self.push_columns(columns, |replacement| {
+ Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(replacement),
+ op: *op,
+ right: right.clone(),
+ })
+ });
+ }
+ if let Some(columns) = self.extract_join_columns(right) {
+ return self.push_columns(columns, |replacement| {
+ Expr::BinaryExpr(BinaryExpr {
+ left: left.clone(),
+ op: *op,
+ right: Box::new(replacement),
+ })
+ });
+ }
+ }
+ Expr::IsNull(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNull(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotNull(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotNull(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsTrue(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsTrue(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsFalse(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsFalse(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsUnknown(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsUnknown(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotTrue(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotTrue(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotFalse(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotFalse(Box::new(replacement))
+ });
+ }
+ }
+ Expr::IsNotUnknown(expr) => {
+ if let Some(columns) = self.extract_join_columns(expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::IsNotUnknown(Box::new(replacement))
+ });
+ }
+ }
+ Expr::Between(between) => {
+ if let Some(columns) =
self.extract_join_columns(&between.expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::Between(Between {
+ expr: Box::new(replacement),
+ negated: between.negated,
+ low: between.low.clone(),
+ high: between.high.clone(),
+ })
+ });
+ }
+ }
+ Expr::InList(in_list) => {
+ if let Some(columns) =
self.extract_join_columns(&in_list.expr) {
+ return self.push_columns(columns, |replacement| {
+ Expr::InList(InList {
+ expr: Box::new(replacement),
+ list: in_list.list.clone(),
+ negated: in_list.negated,
+ })
+ });
+ }
+ }
+ _ => {}
+ }
+ self.remaining_filters.push(term.clone());
+ }
+
+ fn push_predicate(
+ mut self,
+ predicate: Expr,
+ ) -> Result<(Option<Expr>, Option<Expr>, Vec<Expr>)> {
+ let predicates = split_conjunction_owned(predicate);
+ let terms = simplify_predicates(predicates)?;
+ for term in terms {
+ self.push_term(&term);
+ }
+ Ok((
+ conjunction(self.left_filters),
+ conjunction(self.right_filters),
+ self.remaining_filters,
+ ))
+ }
+}
+
+fn push_full_join_coalesce_filters(
Review Comment:
Sorry, forgot to change the name. I am using this optimization in my code
specifically for chains (up to 50-table long) of FULL OUTER JOINs. I am making
joins with a sequence join->project with coalesce over join keys -> alias, like:
```rust
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;
```
This way the initial data which looks like
```json
{
"table1": {
"1": 100,
"2": 200,
"3": 300
},
"table2": {
"2": 2000,
"3": 3000,
"4": 4000
},
"table3": {
"3": 30000,
"4": 40000,
"5": 50000
}
```
is joined into
| key | table1 | table2 | table3 |
|-----|-------|-------|--------|
| 1 | 100 | null | null |
| 2 | 200 | 2000 | null |
| 3 | 300 | 3000 | 30000 |
| 4 | null | 4000 | 40000 |
| 5 | null | null | 50000 |
instead of
| key1 | key2 | key3 | table1 | table2 | table3 |
|----- |----- |----- |-------|-------|--------|
| 1 | null | null | 100 | null | null |
| 2 | 2 | null | 200 | 2000 | null |
| 3 | 3 | 3 | 300 | 3000 | 30000 |
| null | 4 | 4 | null | 4000 | 40000 |
| null | null | 5 | null | null | 50000 |
You can check the illustration
https://docs.platforma.bio/guides/vdj-analysis/diversity-analysis/#results-table
where different sample properties are joined by Sample Id from different
parquet files.
When I apply filter on Key what I effectively want is to replicate this
filter to all input tables. And optimization that I provided does exactly that.
I am applying the chain **join->project with coalesce over join keys ->
alias** for each new table, so for 50 tables I would have 49 projections with
coalesce. Without my optimization, each optimizer pass has simplification which
turns `coalesce` into `CASE` and then performs push-down which again turns
`case` to `coalesce`. So 1 optimizer pass gives me propagation through 1 layer,
and for 50 tables I would have to have 49 optimizer passes for full
propagation. The optimization in this PR allows to optimize such scenario in 1
optimizer pass.
I realized that this optimization seems correct for any type of join if
coalesce is applied to the join keys, so I do not have explicit check for FULL
OUTER JOIN in proposed code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]