Satyr09 commented on code in PR #21121:
URL: https://github.com/apache/datafusion/pull/21121#discussion_r2977005704
##########
datafusion/substrait/tests/cases/consumer_integration.rs:
##########
@@ -762,4 +762,40 @@ mod tests {
Ok(())
}
+
+ /// Regression: a Substrait join with both `equal` and
`is_not_distinct_from`
+ /// must demote `IS NOT DISTINCT FROM` to the join filter (matching the SQL
+ /// planner behavior tested in `join_is_not_distinct_from.slt:179-205`).
+ #[tokio::test]
+ async fn test_mixed_join_equal_and_indistinct_from_substrait_plan() ->
Result<()> {
+ let plan_str =
+ test_plan_to_string("mixed_join_equal_and_indistinct.json").await?;
+ // Eq becomes the equijoin key; IS NOT DISTINCT FROM is demoted to
filter.
+ assert_snapshot!(
+ plan_str,
+ @r#"
+ Projection: left.id, left.val, left.comment, right.id AS id0,
right.val AS val0, right.comment AS comment0
+ Inner Join: left.id = right.id Filter: left.val IS NOT DISTINCT FROM
right.val
+ SubqueryAlias: left
+ Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"),
Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"),
Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
+ SubqueryAlias: right
+ Values: (Utf8("1"), Utf8("a"), Utf8("c1")), (Utf8("2"),
Utf8("b"), Utf8("c2")), (Utf8("3"), Utf8(NULL), Utf8("c3")), (Utf8("4"),
Utf8(NULL), Utf8("c4")), (Utf8("5"), Utf8("e"), Utf8("c5"))...
+ "#
+ );
+
+ // Also execute to verify NULL=NULL rows (ids 3,4) are preserved.
+ let path =
"tests/testdata/test_plans/mixed_join_equal_and_indistinct.json";
+ let proto = serde_json::from_reader::<_, Plan>(BufReader::new(
+ File::open(path).expect("file not found"),
+ ))
+ .expect("failed to parse json");
+ let ctx = SessionContext::new();
+ let plan = from_substrait_plan(&ctx.state(), &proto).await?;
+ let df = ctx.execute_logical_plan(plan).await?;
+ let results = df.collect().await?;
+ let total_rows: usize = results.iter().map(|b| b.num_rows()).sum();
+ assert_eq!(total_rows, 6);
Review Comment:
Just to make this regression test a bit stronger - can we check that the
rows we expect to exist do?
##########
datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs:
##########
@@ -89,49 +84,61 @@ pub async fn from_join_rel(
}
fn split_eq_and_noneq_join_predicate_with_nulls_equality(
- filter: &Expr,
-) -> (Vec<(Column, Column)>, bool, Option<Expr>) {
- let exprs = split_conjunction(filter);
+ filter: Expr,
+) -> (Vec<(Column, Column)>, NullEquality, Option<Expr>) {
+ let exprs = split_conjunction_owned(filter);
- let mut accum_join_keys: Vec<(Column, Column)> = vec![];
+ let mut eq_keys: Vec<(Column, Column)> = vec![];
+ let mut indistinct_keys: Vec<(Column, Column)> = vec![];
let mut accum_filters: Vec<Expr> = vec![];
- let mut nulls_equal_nulls = false;
for expr in exprs {
- #[expect(clippy::collapsible_match)]
match expr {
- Expr::BinaryExpr(binary_expr) => match binary_expr {
- x @ (BinaryExpr {
- left,
- op: Operator::Eq,
- right,
+ Expr::BinaryExpr(BinaryExpr {
+ left,
+ op: op @ (Operator::Eq | Operator::IsNotDistinctFrom),
+ right,
+ }) => match (*left, *right) {
+ (Expr::Column(l), Expr::Column(r)) => match op {
+ Operator::Eq => eq_keys.push((l, r)),
+ Operator::IsNotDistinctFrom => indistinct_keys.push((l,
r)),
+ _ => unreachable!(),
+ },
+ (left, right) => {
+ accum_filters.push(Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(left),
+ op,
+ right: Box::new(right),
+ }));
}
- | BinaryExpr {
- left,
- op: Operator::IsNotDistinctFrom,
- right,
- }) => {
- nulls_equal_nulls = match x.op {
- Operator::Eq => false,
- Operator::IsNotDistinctFrom => true,
- _ => unreachable!(),
- };
-
- match (left.as_ref(), right.as_ref()) {
- (Expr::Column(l), Expr::Column(r)) => {
- accum_join_keys.push((l.clone(), r.clone()));
- }
- _ => accum_filters.push(expr.clone()),
- }
- }
- _ => accum_filters.push(expr.clone()),
},
- _ => accum_filters.push(expr.clone()),
+ _ => accum_filters.push(expr),
}
}
+ let (join_keys, null_equality) =
+ match (eq_keys.is_empty(), indistinct_keys.is_empty()) {
+ // Mixed: use eq_keys as equijoin keys, demote indistinct keys to
filter
Review Comment:
We unconditionally favour Eq keys here, but if we have a case where there
exists multiple (say 4) `IS NOT DISTINCT FROM` column pairs and 1 `Eq` column
pair, this demotes all 4 to filter and keeps just the 1 eq key, right?
But, in this case, would the inverse (demote the single eq to filter) not
allow more columns to participate in the hash partitioning/pruning and
therefore be a bit more performant?
More selective hash key = frwer candidate pairs survive and need fewer
row-by-row filter evaluation, if I understand correctly?
##########
datafusion/substrait/src/logical_plan/consumer/rel/join_rel.rs:
##########
@@ -153,3 +160,104 @@ fn from_substrait_jointype(join_type: i32) ->
datafusion::common::Result<JoinTyp
plan_err!("invalid join type variant {join_type}")
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn col(name: &str) -> Expr {
+ Expr::Column(Column::from_name(name))
+ }
+
+ #[test]
+ fn split_only_eq_keys() {
+ // equal(a, b)
+ let expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(col("a")),
+ op: Operator::Eq,
+ right: Box::new(col("b")),
+ });
+
+ let (keys, null_eq, filter) =
+ split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
+
+ assert_eq!(keys.len(), 1);
+ assert_eq!(null_eq, NullEquality::NullEqualsNothing);
+ assert!(filter.is_none());
+ }
+
+ #[test]
+ fn split_only_indistinct_keys() {
+ // is_not_distinct_from(a, b)
+ let expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(col("a")),
+ op: Operator::IsNotDistinctFrom,
+ right: Box::new(col("b")),
+ });
+
+ let (keys, null_eq, filter) =
+ split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
+
+ assert_eq!(keys.len(), 1);
+ assert_eq!(null_eq, NullEquality::NullEqualsNull);
+ assert!(filter.is_none());
+ }
+
+ /// Regression: mixed `equal` + `is_not_distinct_from` must demote
+ /// the indistinct key to the join filter so the single NullEquality
+ /// flag stays consistent (NullEqualsNothing for the eq keys).
+ #[test]
+ fn split_mixed_eq_and_indistinct_demotes_indistinct_to_filter() {
+ // is_not_distinct_from(val_l, val_r) AND equal(id_l, id_r)
+ let expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(col("val_l")),
+ op: Operator::IsNotDistinctFrom,
+ right: Box::new(col("val_r")),
+ })
+ .and(Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(col("id_l")),
+ op: Operator::Eq,
+ right: Box::new(col("id_r")),
+ }));
+
+ let (keys, null_eq, filter) =
+ split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
+
+ // Only the Eq key should be an equijoin key.
+ assert_eq!(keys.len(), 1);
+ assert_eq!(keys[0].0, Column::from_name("id_l"));
+ assert_eq!(keys[0].1, Column::from_name("id_r"));
+ assert_eq!(null_eq, NullEquality::NullEqualsNothing);
+
+ // The IsNotDistinctFrom predicate should be demoted to the filter.
+ let filter =
+ filter.expect("filter should contain the demoted indistinct
predicate");
+ match &filter {
+ Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
+ assert_eq!(*op, Operator::IsNotDistinctFrom);
+ assert_eq!(**left, col("val_l"));
+ assert_eq!(**right, col("val_r"));
+ }
+ other => panic!("expected BinaryExpr, got {other:?}"),
+ }
+ }
+
+ #[test]
+ fn split_non_column_eq_goes_to_filter() {
+ // equal(literal, column) — non-column operand goes to filter
+ let expr = Expr::BinaryExpr(BinaryExpr {
+ left: Box::new(Expr::Literal(
+ datafusion::common::ScalarValue::Utf8(Some("x".into())),
+ None,
+ )),
+ op: Operator::Eq,
+ right: Box::new(col("b")),
+ });
+
+ let (keys, _, filter) =
+ split_eq_and_noneq_join_predicate_with_nulls_equality(expr);
+
+ assert!(keys.is_empty());
+ assert!(filter.is_some());
+ }
+}
Review Comment:
Can we also add some tests for multiple `IS NOT DISTINCT FROM` keys being
demoted simultaneously?
Also, in the integration tests - can we check for other types of joins
(outer etc.)?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]