This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1bfe7404fd fix: preserve column qualifier for `DataFrame::with_column` 
(#7792)
1bfe7404fd is described below

commit 1bfe7404fd2c08d9a8717ccc8f04063a6e254b8e
Author: Jonah Gao <[email protected]>
AuthorDate: Thu Oct 12 04:26:45 2023 +0800

    fix: preserve column qualifier for `DataFrame::with_column` (#7792)
    
    * fix: preserve column qualifier for `DataFrame::with_column`
    
    * fix test variable
    
    * review feedback: add self join test
---
 datafusion/core/src/dataframe.rs | 130 +++++++++++++++++++++++++++++++++++++--
 1 file changed, 126 insertions(+), 4 deletions(-)

diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs
index f649080112..a06722f92c 100644
--- a/datafusion/core/src/dataframe.rs
+++ b/datafusion/core/src/dataframe.rs
@@ -1144,10 +1144,7 @@ impl DataFrame {
                     col_exists = true;
                     new_column.clone()
                 } else {
-                    Expr::Column(Column {
-                        relation: None,
-                        name: f.name().into(),
-                    })
+                    col(f.qualified_column())
                 }
             })
             .collect();
@@ -1855,6 +1852,131 @@ mod tests {
         Ok(())
     }
 
+    // Test issue: https://github.com/apache/arrow-datafusion/issues/7790
+    // The join operation outputs two identical column names, but they belong 
to different relations.
+    #[tokio::test]
+    async fn with_column_join_same_columns() -> Result<()> {
+        let df = test_table().await?.select_columns(&["c1"])?;
+        let ctx = SessionContext::new();
+
+        let table = df.into_view();
+        ctx.register_table("t1", table.clone())?;
+        ctx.register_table("t2", table)?;
+        let df = ctx
+            .table("t1")
+            .await?
+            .join(
+                ctx.table("t2").await?,
+                JoinType::Inner,
+                &["c1"],
+                &["c1"],
+                None,
+            )?
+            .sort(vec![
+                // make the test deterministic
+                col("t1.c1").sort(true, true),
+            ])?
+            .limit(0, Some(1))?;
+
+        let df_results = df.clone().collect().await?;
+        assert_batches_sorted_eq!(
+            [
+                "+----+----+",
+                "| c1 | c1 |",
+                "+----+----+",
+                "| a  | a  |",
+                "+----+----+",
+            ],
+            &df_results
+        );
+
+        let df_with_column = df.clone().with_column("new_column", lit(true))?;
+
+        assert_eq!(
+            "\
+        Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
+        \n  Limit: skip=0, fetch=1\
+        \n    Sort: t1.c1 ASC NULLS FIRST\
+        \n      Inner Join: t1.c1 = t2.c1\
+        \n        TableScan: t1\
+        \n        TableScan: t2",
+            format!("{:?}", df_with_column.logical_plan())
+        );
+
+        assert_eq!(
+            "\
+        Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
+        \n  Limit: skip=0, fetch=1\
+        \n    Sort: t1.c1 ASC NULLS FIRST, fetch=1\
+        \n      Inner Join: t1.c1 = t2.c1\
+        \n        SubqueryAlias: t1\
+        \n          TableScan: aggregate_test_100 projection=[c1]\
+        \n        SubqueryAlias: t2\
+        \n          TableScan: aggregate_test_100 projection=[c1]",
+            format!("{:?}", df_with_column.clone().into_optimized_plan()?)
+        );
+
+        let df_results = df_with_column.collect().await?;
+
+        assert_batches_sorted_eq!(
+            [
+                "+----+----+------------+",
+                "| c1 | c1 | new_column |",
+                "+----+----+------------+",
+                "| a  | a  | true       |",
+                "+----+----+------------+",
+            ],
+            &df_results
+        );
+        Ok(())
+    }
+
+    // Table 't1' self join
+    // Supplementary test of issue: 
https://github.com/apache/arrow-datafusion/issues/7790
+    #[tokio::test]
+    async fn with_column_self_join() -> Result<()> {
+        let df = test_table().await?.select_columns(&["c1"])?;
+        let ctx = SessionContext::new();
+
+        ctx.register_table("t1", df.into_view())?;
+
+        let df = ctx
+            .table("t1")
+            .await?
+            .join(
+                ctx.table("t1").await?,
+                JoinType::Inner,
+                &["c1"],
+                &["c1"],
+                None,
+            )?
+            .sort(vec![
+                // make the test deterministic
+                col("t1.c1").sort(true, true),
+            ])?
+            .limit(0, Some(1))?;
+
+        let df_results = df.clone().collect().await?;
+        assert_batches_sorted_eq!(
+            [
+                "+----+----+",
+                "| c1 | c1 |",
+                "+----+----+",
+                "| a  | a  |",
+                "+----+----+",
+            ],
+            &df_results
+        );
+
+        let actual_err = df.clone().with_column("new_column", 
lit(true)).unwrap_err();
+        let expected_err = "Error during planning: Projections require unique 
expression names \
+            but the expression \"t1.c1\" at position 0 and \"t1.c1\" at 
position 1 have the same name. \
+            Consider aliasing (\"AS\") one of them.";
+        assert_eq!(actual_err.strip_backtrace(), expected_err);
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn with_column_renamed() -> Result<()> {
         let df = test_table()

Reply via email to