vadimpiven opened a new issue, #18830:
URL: https://github.com/apache/datafusion/issues/18830
### Is your feature request related to a problem or challenge?
I have a scenario when I want to perform a series of outer joins on the same
key. To achieve that I apply coalesce on the join keys:
```sql
SELECT coalesce(coalesce(t1.a, t2.a), t3.a) as a
FROM t1
FULL OUTER JOIN t2 ON t1.a = t2.a
FULL OUTER JOIN t3 ON coalesce(t1.a, t2.a) = t3.a
```
The problems begin when I try to filter this table:
```sql
SELECT coalesce(coalesce(t1.a, t2.a), t3.a) as a
FROM t1
FULL OUTER JOIN t2 ON t1.a = t2.a
FULL OUTER JOIN t3 ON coalesce(t1.a, t2.a) = t3.a
WHERE coalesce(coalesce(t1.a, t2.a), t3.a) = 1
```
The standard `PushDownFilter` optimizer rule does not propagate filters on
coalesce. But as far as I can tell, **push-down of the filter on coalesce over
join keys** is correct. So I propose adding such optimization to DataFusion.
### Describe the solution you'd like
I believe `PushDownFilter` optimizer rule could be extended to support
**push-down of the filter on coalesce over join keys** to the respective sides
of the join.
### Describe alternatives you've considered
I have developed an optimizer rule covering my narrow use-case. It would be
great if someone could generalize it and add to the DataFusion codebase.
<details>
<summary>Here goes the code (with documentation and tests)</summary>
```rust
//! Coalesce-aware filter push-down optimization for FULL OUTER JOINs.
//!
//! This rule extends `DataFusion`'s standard filter push-down by handling
filters on
//! `coalesce` expressions that combine join keys from FULL OUTER JOINs.
use std::{collections::HashSet, sync::Arc};
use ahash::RandomState;
use datafusion::{
common::{Column, Result, tree_node::Transformed},
functions::core::coalesce::CoalesceFunc,
logical_expr::{
BinaryExpr, Expr, Filter, JoinType, LogicalPlan,
expr::ScalarFunction,
utils::{conjunction, split_conjunction},
},
optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule,
push_down_filter::PushDownFilter},
};
/// Optimizer rule for pushing filters through `coalesce` expressions in
FULL OUTER JOINs.
///
/// # Introduction
///
/// `DataFusion`'s standard [`PushDownFilter`] rule cannot push filters
through `coalesce`
/// expressions because doing so is generally incorrect. However, in FULL
OUTER JOINs,
/// `coalesce` is used to combine matching join keys (e.g.,
`coalesce(left.id, right.id)`)
/// where the columns either have identical values or one is NULL. In this
specific case,
/// it is safe to decompose a filter on the coalesce into separate filters
on each input.
///
/// This rule enables partition pruning and early data elimination by
pushing such filters
/// directly to both sides of the join.
///
/// For example, given a FULL JOIN with a coalesced join key:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// TableScan: left
/// TableScan: right
/// ```
///
/// This rule decomposes the filter on the coalesced key into separate
filters for each side:
///
/// ```text
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// Filter: left.id = 1
/// TableScan: left
/// Filter: right.id = 1
/// TableScan: right
/// ```
///
/// `DataFusion`'s standard [`PushDownFilter`] can then push these filters
further down
/// through projections and unions to eliminate partitions that don't match.
///
/// # Handling Projections and Aliases
///
/// Filters are often applied to aliased columns from projections rather
than directly
/// to the join output. The rule traverses through projections and subquery
aliases to
/// find the underlying FULL JOIN:
///
/// ```text
/// Filter: j1.id = 1
/// SubqueryAlias: j1
/// Projection: coalesce(left.id, right.id) AS id
/// Full Join: left.id = right.id
/// ```
///
/// The rule builds a projection map to resolve column references and then
reconstructs
/// the plan with filters inserted at the join level.
///
/// # Handling Nested Coalesce
///
/// Multi-level FULL JOINs produce nested coalesce expressions. For example:
///
/// ```text
/// coalesce(coalesce(a.id, b.id), c.id)
/// ```
///
/// The rule recursively flattens these using the projection map, expanding
all nested
/// references in a single pass:
///
/// ```text
/// Filters applied: a.id = 1, b.id = 1, c.id = 1
/// ```
///
/// # Handling AND Conjunctions
///
/// Like [`PushDownFilter`], this rule handles AND-connected filters
(conjunctions).
/// Each conjunction term is processed independently:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1 AND left.val = 2
/// Full Join: left.id = right.id
/// ```
///
/// The coalesce filter is decomposed and pushed, while the other filter is
left for
/// `DataFusion`'s standard [`PushDownFilter`] to handle:
///
/// ```text
/// Full Join: left.id = right.id
/// Filter: left.id = 1 AND left.val = 2
/// TableScan: left
/// Filter: right.id = 1
/// TableScan: right
/// ```
///
/// # Safety: OR Filters and Non-FULL JOINs
///
/// The rule only processes AND-connected filters containing coalesce
expressions on
/// FULL JOINs. OR filters are left unchanged because splitting them would
be incorrect:
///
/// ```text
/// Filter: coalesce(left.id, right.id) = 1 OR left.val = 2
/// Full Join
/// ```
///
/// This filter is not touched by this rule and is left for `DataFusion`'s
standard
/// optimizer to push through the join (rewriting the coalesce as needed).
///
/// Similarly, filters on INNER, LEFT, or RIGHT joins are not processed
because
/// coalesce semantics differ for those join types.
///
/// # Implementation Notes
///
/// This rule performs a top-down traversal, identifying Filter nodes with
coalesce
/// expressions. For each such filter:
///
/// 1. Build a projection map to resolve column aliases
/// 2. Traverse through projections and aliases to find the FULL JOIN
/// 3. Decompose coalesce-based filter terms into per-side filters
/// 4. Attach the filters to both join inputs
/// 5. Reconstruct the plan with the same projection/alias structure
/// 6. Feed the result back into `DataFusion`'s [`PushDownFilter`] via
`transform_data`
/// for continued optimization
///
/// [`PushDownFilter`]:
datafusion::optimizer::push_down_filter::PushDownFilter
#[derive(Debug, Default)]
pub struct CoalesceAwarePushDownFilter {
push_down_filter: PushDownFilter,
}
impl CoalesceAwarePushDownFilter {
/// Creates new [`CoalesceAwarePushDownFilter`] optimizer rule.
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
/// Checks if expression is a coalesce function call.
fn is_coalesce(expr: &Expr) -> Option<&[Expr]> {
if let Expr::ScalarFunction(ScalarFunction { func, args }) = expr
&& func
.inner()
.as_any()
.downcast_ref::<CoalesceFunc>()
.is_some()
{
return Some(args.as_slice());
}
None
}
/// Indicates which side of a join a column originates from.
#[derive(Clone, Copy)]
enum JoinSide {
Left,
Right,
}
fn extract_join_columns(
expr: &Expr,
left_cols: &HashSet<Column, RandomState>,
right_cols: &HashSet<Column, RandomState>,
) -> Option<Vec<(JoinSide, Column)>> {
let args = is_coalesce(expr)?;
if args.len() != 2 {
return None;
}
let mut result = Vec::with_capacity(args.len());
let (mut has_left, mut has_right) = (false, false);
for arg in args {
let Expr::Column(col) = arg else {
return None;
};
if left_cols.contains(col) {
has_left = true;
result.push((JoinSide::Left, col.clone()));
} else if right_cols.contains(col) {
has_right = true;
result.push((JoinSide::Right, col.clone()));
} else {
return None;
}
}
(has_left && has_right).then_some(result)
}
/// Tracks filters scheduled for each side of the join with basic
deduplication.
struct FilterPushState {
left_filters: Vec<Expr>,
right_filters: Vec<Expr>,
seen_left: HashSet<Column, RandomState>,
seen_right: HashSet<Column, RandomState>,
}
impl FilterPushState {
fn new() -> Self {
Self {
left_filters: Vec::new(),
right_filters: Vec::new(),
seen_left: HashSet::with_hasher(RandomState::new()),
seen_right: HashSet::with_hasher(RandomState::new()),
}
}
fn push(&mut self, side: JoinSide, column: Column, filter_expr: Expr) {
match side {
JoinSide::Left => {
self.seen_left.insert(column);
self.left_filters.push(filter_expr);
},
JoinSide::Right => {
self.seen_right.insert(column);
self.right_filters.push(filter_expr);
},
}
}
fn push_columns<F: FnMut(Expr) -> Expr>(
&mut self,
columns: Vec<(JoinSide, Column)>,
mut build_filter: F,
) {
for (side, column) in columns {
let filter_expr = build_filter(Expr::Column(column.clone()));
self.push(side, column, filter_expr);
}
}
fn try_push_term(
&mut self,
term: &Expr,
left_cols: &HashSet<Column, RandomState>,
right_cols: &HashSet<Column, RandomState>,
) -> bool {
match term {
Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
if let Some(columns) = extract_join_columns(left, left_cols,
right_cols) {
self.push_columns(columns, |replacement| {
Expr::BinaryExpr(BinaryExpr {
left: Box::new(replacement),
op: *op,
right: right.clone(),
})
});
return true;
}
false
},
Expr::IsNull(expr) => {
if let Some(columns) = extract_join_columns(expr, left_cols,
right_cols) {
self.push_columns(columns, |replacement|
Expr::IsNull(Box::new(replacement)));
return true;
}
false
},
Expr::IsNotNull(expr) => {
if let Some(columns) = extract_join_columns(expr, left_cols,
right_cols) {
self.push_columns(columns, |replacement| {
Expr::IsNotNull(Box::new(replacement))
});
return true;
}
false
},
_ => false,
}
}
}
/// Holds the outcome of splitting a predicate into join-side filters and
remaining terms.
struct SplitFilters {
left: Vec<Expr>,
right: Vec<Expr>,
remaining: Vec<Expr>,
}
fn split_coalesce_predicate(
predicate: &Expr,
left_cols: &HashSet<Column, RandomState>,
right_cols: &HashSet<Column, RandomState>,
) -> SplitFilters {
let mut state = FilterPushState::new();
let mut remaining = Vec::new();
for term in split_conjunction(predicate) {
if !state.try_push_term(term, left_cols, right_cols) {
remaining.push(term.clone());
}
}
SplitFilters {
left: state.left_filters,
right: state.right_filters,
remaining,
}
}
fn apply_coalesce_filter(plan: LogicalPlan) ->
Result<Transformed<LogicalPlan>> {
let LogicalPlan::Filter(Filter {
predicate, input, ..
}) = &plan
else {
return Ok(Transformed::no(plan));
};
let LogicalPlan::Join(join) = input.as_ref() else {
return Ok(Transformed::no(plan));
};
if join.join_type != JoinType::Full {
return Ok(Transformed::no(plan));
}
let left_cols: HashSet<Column, RandomState> =
join.left.schema().columns().into_iter().collect();
let right_cols: HashSet<Column, RandomState> =
join.right.schema().columns().into_iter().collect();
let SplitFilters {
left,
right,
remaining,
} = split_coalesce_predicate(predicate, &left_cols, &right_cols);
if left.is_empty() && right.is_empty() {
return Ok(Transformed::no(plan));
}
let mut join = join.clone();
if let Some(left_predicate) = conjunction(left) {
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(
left_predicate,
join.left.clone(),
)?));
}
if let Some(right_predicate) = conjunction(right) {
join.right = Arc::new(LogicalPlan::Filter(Filter::try_new(
right_predicate,
join.right.clone(),
)?));
}
let mut plan = LogicalPlan::Join(join);
if let Some(remaining) = conjunction(remaining) {
plan = LogicalPlan::Filter(Filter::try_new(remaining,
Arc::new(plan))?);
}
Ok(Transformed::yes(plan))
}
impl OptimizerRule for CoalesceAwarePushDownFilter {
fn name(&self) -> &'static str {
"coalesce_aware_push_down_filter"
}
fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::TopDown)
}
fn supports_rewrite(&self) -> bool {
true
}
fn rewrite(
&self,
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
apply_coalesce_filter(plan)?
.transform_data(|plan| self.push_down_filter.rewrite(plan,
config))
}
}
#[cfg(test)]
mod tests {
use datafusion::{
arrow::datatypes::{DataType, Field, Schema},
datasource::{empty::EmptyTable, provider_as_source},
logical_expr::{LogicalPlanBuilder, col, lit},
optimizer::{Optimizer, OptimizerContext},
prelude::coalesce,
};
use pretty_assertions::assert_eq;
use super::*;
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: LogicalPlan)
-> Result<()> {
let optimizer =
Optimizer::with_rules(vec![Arc::new(CoalesceAwarePushDownFilter::new())]);
let config = OptimizerContext::new().with_max_passes(1);
let optimized = optimizer.optimize(plan, &config, |_, _| {})?;
assert_eq!(format!("{optimized}").trim(),
format!("{expected}").trim());
Ok(())
}
#[test]
fn simple_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("t2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("t2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn propagate_supported_test() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.filter(col("j1.a").gt(lit(1)).and(col("j1.b1").eq(lit(2))))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").gt(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").gt(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.filter(col("t1.b").eq(lit(2)))?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn do_not_propagate_unsupported_test() -> Result<()> {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Int32, true),
]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)).or(col("j1.b1").eq(lit(2))))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.filter(
coalesce(vec![col("t1.a"), col("t2.a")])
.eq(lit(1))
.or(col("t1.b").eq(lit(2))),
)?
.project(vec![
coalesce(vec![col("t1.a"), col("t2.a")]).alias("a"),
col("t1.b").alias("b1"),
col("t2.b").alias("b2"),
])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn nested_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(),
None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t2.a"),
col("j3.a")]).alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("j2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(),
None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t2.a"),
col("j3.a")]).alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("j2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn two_sided_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("t2.a")]).alias("a")])?
.alias("j2")?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(),
None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("j2.a"),
col("j3.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("t2.a")]).alias("a")])?
.alias("j2")?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(), None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("j2.a"),
col("j3.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
#[test]
fn interchanged_test() -> Result<()> {
let schema = Schema::new(vec![Field::new("a", DataType::Int32,
true)]);
let table = EmptyTable::new(Arc::new(schema));
let scan = provider_as_source(Arc::new(table));
let plan = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(),
None)?.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![col("j3.a").alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("j2.a")]).alias("a")])?
.alias("j1")?
.filter(col("j1.a").eq(lit(1)))?
.build()?;
let expected = LogicalPlanBuilder::scan("t1", scan.clone(), None)?
.filter(col("t1.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t2", scan.clone(), None)?
.filter(col("t2.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t3", scan.clone(), None)?
.filter(col("t3.a").eq(lit(1)))?
.join(
LogicalPlanBuilder::scan("t4", scan.clone(),
None)?
.filter(col("t4.a").eq(lit(1)))?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t3.a"),
col("t4.a")]).alias("a")])?
.alias("j3")?
.build()?,
JoinType::Inner,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![col("j3.a").alias("a")])?
.alias("j2")?
.build()?,
JoinType::Full,
(vec!["a"], vec!["a"]),
None,
)?
.project(vec![coalesce(vec![col("t1.a"),
col("j2.a")]).alias("a")])?
.alias("j1")?
.build()?;
assert_optimized_plan_equal(plan, expected)
}
}
```
</details>
### Additional context
Provided implementation has several assumptions standing only in my codebase:
1. coalesce is used solely to combine FULL OUTER JOIN keys
2. in filters column reference is always on the left-hand side of binary
expression
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]