This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 474e9e67 fix: use coalesce instead of drop_duplicate_keys for join 
(#1318)
474e9e67 is described below

commit 474e9e67bfda3074dc435d3d653de538b0cafa7b
Author: Daniel Mesejo <[email protected]>
AuthorDate: Mon Jan 5 14:24:09 2026 +0100

    fix: use coalesce instead of drop_duplicate_keys for join (#1318)
    
    * fix: use coalesce instead of drop_duplicate_keys for join
    
    closes #1305
    
    * fix(docs): join usage
---
 docs/source/user-guide/common-operations/joins.rst |  6 +--
 python/datafusion/dataframe.py                     | 16 ++++----
 python/tests/test_dataframe.py                     | 43 +++++++++++++++++++---
 src/dataframe.rs                                   | 34 ++++++++++++-----
 4 files changed, 73 insertions(+), 26 deletions(-)

diff --git a/docs/source/user-guide/common-operations/joins.rst 
b/docs/source/user-guide/common-operations/joins.rst
index 035d7488..1d9d7038 100644
--- a/docs/source/user-guide/common-operations/joins.rst
+++ b/docs/source/user-guide/common-operations/joins.rst
@@ -107,9 +107,9 @@ Duplicate Keys
 --------------
 
 It is common to join two DataFrames on a common column name. Starting in
-version 51.0.0, ``datafusion-python``` will now drop duplicate column names by
+version 51.0.0, ``datafusion-python``` will now coalesce on column with 
identical names by
 default. This reduces problems with ambiguous column selection after joins.
-You can disable this feature by setting the parameter ``drop_duplicate_keys``
+You can disable this feature by setting the parameter 
``coalesce_duplicate_keys``
 to ``False``.
 
 .. ipython:: python
@@ -133,4 +133,4 @@ In contrast to the above example, if we wish to get both 
columns:
 
 .. ipython:: python
 
-    left.join(right, "id", how="inner", drop_duplicate_keys=False)
+    left.join(right, "id", how="inner", coalesce_duplicate_keys=False)
diff --git a/python/datafusion/dataframe.py b/python/datafusion/dataframe.py
index 12f1145a..d302c12a 100644
--- a/python/datafusion/dataframe.py
+++ b/python/datafusion/dataframe.py
@@ -778,7 +778,7 @@ class DataFrame:
         left_on: None = None,
         right_on: None = None,
         join_keys: None = None,
-        drop_duplicate_keys: bool = True,
+        coalesce_duplicate_keys: bool = True,
     ) -> DataFrame: ...
 
     @overload
@@ -791,7 +791,7 @@ class DataFrame:
         left_on: str | Sequence[str],
         right_on: str | Sequence[str],
         join_keys: tuple[list[str], list[str]] | None = None,
-        drop_duplicate_keys: bool = True,
+        coalesce_duplicate_keys: bool = True,
     ) -> DataFrame: ...
 
     @overload
@@ -804,7 +804,7 @@ class DataFrame:
         join_keys: tuple[list[str], list[str]],
         left_on: None = None,
         right_on: None = None,
-        drop_duplicate_keys: bool = True,
+        coalesce_duplicate_keys: bool = True,
     ) -> DataFrame: ...
 
     def join(
@@ -816,7 +816,7 @@ class DataFrame:
         left_on: str | Sequence[str] | None = None,
         right_on: str | Sequence[str] | None = None,
         join_keys: tuple[list[str], list[str]] | None = None,
-        drop_duplicate_keys: bool = True,
+        coalesce_duplicate_keys: bool = True,
     ) -> DataFrame:
         """Join this :py:class:`DataFrame` with another :py:class:`DataFrame`.
 
@@ -829,9 +829,9 @@ class DataFrame:
                 "right", "full", "semi", "anti".
             left_on: Join column of the left dataframe.
             right_on: Join column of the right dataframe.
-            drop_duplicate_keys: When True, the columns from the right 
DataFrame
-                that have identical names in the ``on`` fields to the left 
DataFrame
-                will be dropped.
+            coalesce_duplicate_keys: When True, coalesce the columns
+                from the right DataFrame and left DataFrame
+                that have identical names in the ``on`` fields.
             join_keys: Tuple of two lists of column names to join on. 
[Deprecated]
 
         Returns:
@@ -879,7 +879,7 @@ class DataFrame:
             right_on = [right_on]
 
         return DataFrame(
-            self.df.join(right.df, how, left_on, right_on, drop_duplicate_keys)
+            self.df.join(right.df, how, left_on, right_on, 
coalesce_duplicate_keys)
         )
 
     def join_on(
diff --git a/python/tests/test_dataframe.py b/python/tests/test_dataframe.py
index fa6a5fde..b68493b6 100644
--- a/python/tests/test_dataframe.py
+++ b/python/tests/test_dataframe.py
@@ -663,7 +663,7 @@ def test_join():
     df1 = ctx.create_dataframe([[batch]], "r")
 
     df2 = df.join(df1, on="a", how="inner")
-    df2 = df2.sort(column("l.a"))
+    df2 = df2.sort(column("a"))
     table = pa.Table.from_batches(df2.collect())
 
     expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -673,8 +673,10 @@ def test_join():
     # Since we may have a duplicate column name and pa.Table()
     # hides the fact, instead we need to explicitly check the
     # resultant arrays.
-    df2 = df.join(df1, left_on="a", right_on="a", how="inner", 
drop_duplicate_keys=True)
-    df2 = df2.sort(column("l.a"))
+    df2 = df.join(
+        df1, left_on="a", right_on="a", how="inner", 
coalesce_duplicate_keys=True
+    )
+    df2 = df2.sort(column("a"))
     result = df2.collect()[0]
     assert result.num_columns == 3
     assert result.column(0) == pa.array([1, 2], pa.int64())
@@ -682,7 +684,7 @@ def test_join():
     assert result.column(2) == pa.array([8, 10], pa.int64())
 
     df2 = df.join(
-        df1, left_on="a", right_on="a", how="inner", drop_duplicate_keys=False
+        df1, left_on="a", right_on="a", how="inner", 
coalesce_duplicate_keys=False
     )
     df2 = df2.sort(column("l.a"))
     result = df2.collect()[0]
@@ -695,7 +697,7 @@ def test_join():
     # Verify we don't make a breaking change to pre-43.0.0
     # where users would pass join_keys as a positional argument
     df2 = df.join(df1, (["a"], ["a"]), how="inner")
-    df2 = df2.sort(column("l.a"))
+    df2 = df2.sort(column("a"))
     table = pa.Table.from_batches(df2.collect())
 
     expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -720,7 +722,7 @@ def test_join_invalid_params():
     with pytest.deprecated_call():
         df2 = df.join(df1, join_keys=(["a"], ["a"]), how="inner")
         df2.show()
-        df2 = df2.sort(column("l.a"))
+        df2 = df2.sort(column("a"))
         table = pa.Table.from_batches(df2.collect())
 
         expected = {"a": [1, 2], "c": [8, 10], "b": [4, 5]}
@@ -778,6 +780,35 @@ def test_join_on():
     assert table.to_pydict() == expected
 
 
+def test_join_full_with_drop_duplicate_keys():
+    ctx = SessionContext()
+
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([1, 3, 5, 7, 9]), pa.array([True, True, True, True, True])],
+        names=["log_time", "key_frame"],
+    )
+    key_frame = ctx.create_dataframe([[batch]])
+
+    batch = pa.RecordBatch.from_arrays(
+        [pa.array([2, 4, 6, 8, 10])],
+        names=["log_time"],
+    )
+    query_times = ctx.create_dataframe([[batch]])
+
+    merged = query_times.join(
+        key_frame,
+        left_on="log_time",
+        right_on="log_time",
+        how="full",
+        coalesce_duplicate_keys=True,
+    )
+    merged = merged.sort(column("log_time"))
+    result = merged.collect()[0]
+
+    assert result.num_columns == 2
+    assert result.column(0).to_pylist() == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
+
+
 def test_join_on_invalid_expr():
     ctx = SessionContext()
 
diff --git a/src/dataframe.rs b/src/dataframe.rs
index 0cf03d63..7bd601de 100644
--- a/src/dataframe.rs
+++ b/src/dataframe.rs
@@ -653,7 +653,7 @@ impl PyDataFrame {
         how: &str,
         left_on: Vec<PyBackedStr>,
         right_on: Vec<PyBackedStr>,
-        drop_duplicate_keys: bool,
+        coalesce_keys: bool,
     ) -> PyDataFusionResult<Self> {
         let join_type = match how {
             "inner" => JoinType::Inner,
@@ -680,7 +680,7 @@ impl PyDataFrame {
             None,
         )?;
 
-        if drop_duplicate_keys {
+        if coalesce_keys {
             let mutual_keys = left_keys
                 .iter()
                 .zip(right_keys.iter())
@@ -688,15 +688,16 @@ impl PyDataFrame {
                 .map(|(key, _)| *key)
                 .collect::<Vec<_>>();
 
-            let fields_to_drop = mutual_keys
+            let fields_to_coalesce = mutual_keys
                 .iter()
                 .map(|name| {
-                    df.logical_plan()
+                    let qualified_fields = df
+                        .logical_plan()
                         .schema()
-                        .qualified_fields_with_unqualified_name(name)
+                        .qualified_fields_with_unqualified_name(name);
+                    (*name, qualified_fields)
                 })
-                .filter(|r| r.len() == 2)
-                .map(|r| r[1])
+                .filter(|(_, fields)| fields.len() == 2)
                 .collect::<Vec<_>>();
 
             let expr: Vec<Expr> = df
@@ -706,8 +707,23 @@ impl PyDataFrame {
                 .into_iter()
                 .enumerate()
                 .map(|(idx, _)| 
df.logical_plan().schema().qualified_field(idx))
-                .filter(|(qualifier, f)| 
!fields_to_drop.contains(&(*qualifier, f)))
-                .map(|(qualifier, field)| 
Expr::Column(Column::from((qualifier, field))))
+                .filter_map(|(qualifier, field)| {
+                    if let Some((key_name, qualified_fields)) = 
fields_to_coalesce
+                        .iter()
+                        .find(|(_, qf)| qf.contains(&(qualifier, field)))
+                    {
+                        // Only add the coalesce expression once (when we 
encounter the first field)
+                        // Skip the second field (it's already included in to 
coalesce)
+                        if (qualifier, field) == qualified_fields[0] {
+                            let left_col = 
Expr::Column(Column::from(qualified_fields[0]));
+                            let right_col = 
Expr::Column(Column::from(qualified_fields[1]));
+                            return Some(coalesce(vec![left_col, 
right_col]).alias(*key_name));
+                        }
+                        None
+                    } else {
+                        Some(Expr::Column(Column::from((qualifier, field))))
+                    }
+                })
                 .collect();
             df = df.select(expr)?;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to