alamb commented on code in PR #5210:
URL: https://github.com/apache/arrow-datafusion/pull/5210#discussion_r1099265795
##########
datafusion/core/src/dataframe.rs:
##########
@@ -1039,6 +1088,33 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn join_on() -> Result<()> {
+ let left = test_table_with_name("a")
+ .await?
+ .select_columns(&["c1", "c2"])?;
+ let right = test_table_with_name("b")
+ .await?
+ .select_columns(&["c1", "c2"])?;
+ let join = left.join_on(
+ right,
+ JoinType::Inner,
+ [
+ col("a.c1").not_eq(col("b.c1")),
+ col("a.c2").not_eq(col("b.c2")),
Review Comment:
Would it be possible here to also add an equality predicate to demonstrate
they are automatically recognized as equi preds?
Perhaps something like
```suggestion
col("a.c2").eq(col("b.c2")),
```
##########
datafusion/expr/src/logical_plan/builder.rs:
##########
@@ -502,6 +505,25 @@ impl LogicalPlanBuilder {
));
}
+ let filter = if let Some(expr) = filter {
+ // ambiguous check
+ ensure_any_column_reference_is_unambiguous(
+ &expr,
+ &[self.schema(), right.schema()],
+ )?;
+
+ // normalize all columns in expression
+ let using_columns = expr.to_columns()?;
+ let filter = normalize_col_with_schemas(
+ expr,
+ &[self.schema(), right.schema()],
+ &[using_columns],
+ )?;
+ Some(filter)
+ } else {
+ None
+ };
+
Review Comment:
I think it is fine to include in this PR as long as it also has a test (for
ambiguity check using the DataFrame API)
##########
datafusion/core/src/dataframe.rs:
##########
@@ -363,6 +363,55 @@ impl DataFrame {
Ok(DataFrame::new(self.session_state, plan))
}
+ /// Join this DataFrame with another DataFrame using the specified
expressions.
+ ///
+ /// Simply a thin wrapper over [`join`](Self::join) where the join keys
are not provided,
+ /// and the provided expressions are AND'ed together to form the filter
expression.
+ ///
+ /// ```
+ /// # use datafusion::prelude::*;
+ /// # use datafusion::error::Result;
+ /// # #[tokio::main]
+ /// # async fn main() -> Result<()> {
+ /// let ctx = SessionContext::new();
+ /// let left = ctx
+ /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
+ /// .await?;
+ /// let right = ctx
+ /// .read_csv("tests/data/example.csv", CsvReadOptions::new())
+ /// .await?
+ /// .select(vec![
+ /// col("a").alias("a2"),
+ /// col("b").alias("b2"),
+ /// col("c").alias("c2"),
+ /// ])?;
+ /// let join_on = left.join_on(
+ /// right,
+ /// JoinType::Inner,
+ /// [col("a").not_eq(col("a2")), col("b").not_eq(col("b2"))],
+ /// )?;
+ /// let batches = join_on.collect().await?;
+ /// # Ok(())
+ /// # }
+ /// ```
+ pub fn join_on(
Review Comment:
👍 LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]