This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 474e9e67 fix: use coalesce instead of drop_duplicate_keys for join
(#1318)
474e9e67 is described below
commit 474e9e67bfda3074dc435d3d653de538b0cafa7b
Author: Daniel Mesejo <[email protected]>
AuthorDate: Mon Jan 5 14:24:09 2026 +0100
fix: use coalesce instead of drop_duplicate_keys for join (#1318)
* fix: use coalesce instead of drop_duplicate_keys for join
closes #1305
* fix(docs): join usage
---
docs/source/user-guide/common-operations/joins.rst | 6 +--
python/datafusion/dataframe.py | 16 ++++----
python/tests/test_dataframe.py | 43 +++++++++++++++++++---
src/dataframe.rs | 34 ++++++++++++-----
4 files changed, 73 insertions(+), 26 deletions(-)
diff --git a/docs/source/user-guide/common-operations/joins.rst
b/docs/source/user-guide/common-operations/joins.rst
index 035d7488..1d9d7038 100644
--- a/docs/source/user-guide/common-operations/joins.rst
+++ b/docs/source/user-guide/common-operations/joins.rst
@@ -107,9 +107,9 @@ Duplicate Keys
--------------
It is common to join two DataFrames on a common column name. Starting in
-version 51.0.0, ``datafusion-python``` will now drop duplicate column names by
+version 51.0.0, ``datafusion-python``` will now coalesce on column with
identical names by
default. This reduces problems with ambiguous column selection after joins.
-You can disable this feature by setting the parameter ``drop_duplicate_keys``
+You can disable this feature by setting the parameter
``coalesce_duplicate_keys``
to ``False``.
.. ipython:: python
@@ -133,4 +133,4 @@ In contrast to the above example, if we wish to get both
columns:
.. ipython:: python
- left.join(right, "id", how="inner", drop_duplicate_keys=False)
+ left.join(right, "id", how="inner", coalesce_duplicate_keys=False)
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index 12f1145a..d302c12a 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -778,7 +778,7 @@ class DataFrame:
left_on: None = None,
right_on: None = None,
join_keys: None = None,
- drop_duplicate_keys: bool = True,
+ coalesce_duplicate_keys: bool = True,
) -> DataFrame: ...
@overload
@@ -791,7 +791,7 @@ class DataFrame:
left_on: str | Sequence[str],
right_on: str | Sequence[str],
join_keys: tuple[list[str], list[str]] | None = None,
- drop_duplicate_keys: bool = True,
+ coalesce_duplicate_keys: bool = True,
) -> DataFrame: ...
@overload
@@ -804,7 +804,7 @@ class DataFrame:
join_keys: tuple[list[str], list[str]],
left_on: None = None,
right_on: None = None,
- drop_duplicate_keys: bool = True,
+ coalesce_duplicate_keys: bool = True,
) -> DataFrame: ...
def join(
@@ -816,7 +816,7 @@ class DataFrame:
left_on: str | Sequence[str] | None = None,
right_on: str | Sequence[str] | None = None,
join_keys: tuple[list[str], list[str]] | None = None,
- drop_duplicate_keys: bool = True,
+ coalesce_duplicate_keys: bool = True,
) -> DataFrame:
"""Join this :py:class:`DataFrame` with another :py:class:`DataFrame`.
@@ -829,9 +829,9 @@ class DataFrame:
"right", "full", "semi", "anti".
left_on: Join column of the left dataframe.
right_on: Join column of the right dataframe.
- drop_duplicate_keys: When True, the columns from the right
DataFrame
- that have identical names in the ``on`` fields to the left
DataFrame
- will be dropped.
+ coalesce_duplicate_keys: When True, coalesce the columns
+ from the right DataFrame and left DataFrame
+ that have identical names in the ``on`` fields.
join_keys: Tuple of two lists of column names to join on.
[Deprecated]
Returns:
@@ -879,7 +879,7 @@ class DataFrame:
right_on = [right_on]
return DataFrame(
- self.df.join(right.df, how, left_on, right_on, drop_duplicate_keys)
+ self.df.join(right.df, how, left_on, right_on,
coalesce_duplicate_keys)
)
def join_on(
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index fa6a5fde..b68493b6 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -663,7 +663,7 @@ def test_join():
df1 = ctx.create_dataframe([[batch]], "r")
df2 = df.join(df1, on="a", how="inner")
- df2 = df2.sort(column("l.a"))
+ df2 = df2.sort(column("a"))
table = pa.Table.from_batches(df2.collect())
expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -673,8 +673,10 @@ def test_join():
# Since we may have a duplicate column name and pa.Table()
# hides the fact, instead we need to explicitly check the
# resultant arrays.
- df2 = df.join(df1, left_on="a", right_on="a", how="inner",
drop_duplicate_keys=True)
- df2 = df2.sort(column("l.a"))
+ df2 = df.join(
+ df1, left_on="a", right_on="a", how="inner",
coalesce_duplicate_keys=True
+ )
+ df2 = df2.sort(column("a"))
result = df2.collect()[0]
assert result.num_columns == 3
assert result.column(0) == pa.array([1, 2], pa.int64())
@@ -682,7 +684,7 @@ def test_join():
assert result.column(2) == pa.array([8, 10], pa.int64())
df2 = df.join(
- df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=False
+ df1, left_on="a", right_on="a", how="inner",
coalesce_duplicate_keys=False
)
df2 = df2.sort(column("l.a"))
result = df2.collect()[0]
@@ -695,7 +697,7 @@ def test_join():
# Verify we don't make a breaking change to pre-43.0.0
# where users would pass join_keys as a positional argument
df2 = df.join(df1, (["a"], ["a"]), how="inner")
- df2 = df2.sort(column("l.a"))
+ df2 = df2.sort(column("a"))
table = pa.Table.from_batches(df2.collect())
expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -720,7 +722,7 @@ def test_join_invalid_params():
with pytest.deprecated_call():
df2 = df.join(df1, join_keys=(["a"], ["a"]), how="inner")
df2.show()
- df2 = df2.sort(column("l.a"))
+ df2 = df2.sort(column("a"))
table = pa.Table.from_batches(df2.collect())
expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -778,6 +780,35 @@ def test_join_on():
assert table.to_pydict() == expected
+def test_join_full_with_drop_duplicate_keys():
+ ctx = SessionContext()
+
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([1, 3, 5, 7, 9]), pa.array([True, True, True, True, True])],
+ names=["log_time", "key_frame"],
+ )
+ key_frame = ctx.create_dataframe([[batch]])
+
+ batch = pa.RecordBatch.from_arrays(
+ [pa.array([2, 4, 6, 8, 10])],
+ names=["log_time"],
+ )
+ query_times = ctx.create_dataframe([[batch]])
+
+ merged = query_times.join(
+ key_frame,
+ left_on="log_time",
+ right_on="log_time",
+ how="full",
+ coalesce_duplicate_keys=True,
+ )
+ merged = merged.sort(column("log_time"))
+ result = merged.collect()[0]
+
+ assert result.num_columns == 2
+ assert result.column(0).to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+
+
def test_join_on_invalid_expr():
ctx = SessionContext()
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 0cf03d63..7bd601de 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -653,7 +653,7 @@ impl PyDataFrame {
how: &str,
left_on: Vec<PyBackedStr>,
right_on: Vec<PyBackedStr>,
- drop_duplicate_keys: bool,
+ coalesce_keys: bool,
) -> PyDataFusionResult<Self> {
let join_type = match how {
"inner" => JoinType::Inner,
@@ -680,7 +680,7 @@ impl PyDataFrame {
None,
)?;
- if drop_duplicate_keys {
+ if coalesce_keys {
let mutual_keys = left_keys
.iter()
.zip(right_keys.iter())
@@ -688,15 +688,16 @@ impl PyDataFrame {
.map(|(key, _)| *key)
.collect::<Vec<_>>();
- let fields_to_drop = mutual_keys
+ let fields_to_coalesce = mutual_keys
.iter()
.map(|name| {
- df.logical_plan()
+ let qualified_fields = df
+ .logical_plan()
.schema()
- .qualified_fields_with_unqualified_name(name)
+ .qualified_fields_with_unqualified_name(name);
+ (*name, qualified_fields)
})
- .filter(|r| r.len() == 2)
- .map(|r| r[1])
+ .filter(|(_, fields)| fields.len() == 2)
.collect::<Vec<_>>();
let expr: Vec<Expr> = df
@@ -706,8 +707,23 @@ impl PyDataFrame {
.into_iter()
.enumerate()
.map(|(idx, _)|
df.logical_plan().schema().qualified_field(idx))
- .filter(|(qualifier, f)|
!fields_to_drop.contains(&(*qualifier, f)))
- .map(|(qualifier, field)|
Expr::Column(Column::from((qualifier, field))))
+ .filter_map(|(qualifier, field)| {
+ if let Some((key_name, qualified_fields)) =
fields_to_coalesce
+ .iter()
+ .find(|(_, qf)| qf.contains(&(qualifier, field)))
+ {
+ // Only add the coalesce expression once (when we
encounter the first field)
+ // Skip the second field (it's already included in to
coalesce)
+ if (qualifier, field) == qualified_fields[0] {
+ let left_col =
Expr::Column(Column::from(qualified_fields[0]));
+ let right_col =
Expr::Column(Column::from(qualified_fields[1]));
+ return Some(coalesce(vec![left_col,
right_col]).alias(*key_name));
+ }
+ None
+ } else {
+ Some(Expr::Column(Column::from((qualifier, field))))
+ }
+ })
.collect();
df = df.select(expr)?;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]