This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6cca0f7f37 Fix bug when pushing projection under joins (#11333)
6cca0f7f37 is described below

commit 6cca0f7f3725406aef4deb1ff1bbe299867ce82c
Author: Jonah Gao <[email protected]>
AuthorDate: Wed Jul 10 05:08:22 2024 +0800

    Fix bug when pushing projection under joins (#11333)
    
    * Fix bug in `ProjectionPushdown`
    
    * add order by
    
    * Fix join on
---
 .../src/physical_optimizer/projection_pushdown.rs  | 50 ++++++++++++-------
 datafusion/sqllogictest/test_files/join.slt        | 58 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 19 deletions(-)

diff --git a/datafusion/core/src/physical_optimizer/projection_pushdown.rs 
b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
index 70524dfcea..3c2be59f75 100644
--- a/datafusion/core/src/physical_optimizer/projection_pushdown.rs
+++ b/datafusion/core/src/physical_optimizer/projection_pushdown.rs
@@ -46,7 +46,7 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::tree_node::{
     Transformed, TransformedResult, TreeNode, TreeNodeRecursion,
 };
-use datafusion_common::{DataFusionError, JoinSide};
+use datafusion_common::{internal_err, JoinSide};
 use datafusion_physical_expr::expressions::{Column, Literal};
 use datafusion_physical_expr::{
     utils::collect_columns, Partitioning, PhysicalExpr, PhysicalExprRef,
@@ -640,6 +640,7 @@ fn try_pushdown_through_hash_join(
         &projection_as_columns[0..=far_right_left_col_ind as _],
         &projection_as_columns[far_left_right_col_ind as _..],
         hash_join.on(),
+        hash_join.left().schema().fields().len(),
     ) else {
         return Ok(None);
     };
@@ -649,8 +650,7 @@ fn try_pushdown_through_hash_join(
             &projection_as_columns[0..=far_right_left_col_ind as _],
             &projection_as_columns[far_left_right_col_ind as _..],
             filter,
-            hash_join.left(),
-            hash_join.right(),
+            hash_join.left().schema().fields().len(),
         ) {
             Some(updated_filter) => Some(updated_filter),
             None => return Ok(None),
@@ -750,8 +750,7 @@ fn try_swapping_with_nested_loop_join(
             &projection_as_columns[0..=far_right_left_col_ind as _],
             &projection_as_columns[far_left_right_col_ind as _..],
             filter,
-            nl_join.left(),
-            nl_join.right(),
+            nl_join.left().schema().fields().len(),
         ) {
             Some(updated_filter) => Some(updated_filter),
             None => return Ok(None),
@@ -806,6 +805,7 @@ fn try_swapping_with_sort_merge_join(
         &projection_as_columns[0..=far_right_left_col_ind as _],
         &projection_as_columns[far_left_right_col_ind as _..],
         sm_join.on(),
+        sm_join.left().schema().fields().len(),
     ) else {
         return Ok(None);
     };
@@ -859,6 +859,7 @@ fn try_swapping_with_sym_hash_join(
         &projection_as_columns[0..=far_right_left_col_ind as _],
         &projection_as_columns[far_left_right_col_ind as _..],
         sym_join.on(),
+        sym_join.left().schema().fields().len(),
     ) else {
         return Ok(None);
     };
@@ -868,8 +869,7 @@ fn try_swapping_with_sym_hash_join(
             &projection_as_columns[0..=far_right_left_col_ind as _],
             &projection_as_columns[far_left_right_col_ind as _..],
             filter,
-            sym_join.left(),
-            sym_join.right(),
+            sym_join.left().schema().fields().len(),
         ) {
             Some(updated_filter) => Some(updated_filter),
             None => return Ok(None),
@@ -1090,6 +1090,7 @@ fn update_join_on(
     proj_left_exprs: &[(Column, String)],
     proj_right_exprs: &[(Column, String)],
     hash_join_on: &[(PhysicalExprRef, PhysicalExprRef)],
+    left_field_size: usize,
 ) -> Option<Vec<(PhysicalExprRef, PhysicalExprRef)>> {
     // TODO: Clippy wants the "map" call removed, but doing so generates
     //       a compilation error. Remove the clippy directive once this
@@ -1100,8 +1101,9 @@ fn update_join_on(
         .map(|(left, right)| (left, right))
         .unzip();
 
-    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs);
-    let new_right_columns = new_columns_for_join_on(&right_idx, 
proj_right_exprs);
+    let new_left_columns = new_columns_for_join_on(&left_idx, proj_left_exprs, 
0);
+    let new_right_columns =
+        new_columns_for_join_on(&right_idx, proj_right_exprs, left_field_size);
 
     match (new_left_columns, new_right_columns) {
         (Some(left), Some(right)) => 
Some(left.into_iter().zip(right).collect()),
@@ -1112,9 +1114,14 @@ fn update_join_on(
 /// This function generates a new set of columns to be used in a hash join
 /// operation based on a set of equi-join conditions (`hash_join_on`) and a
 /// list of projection expressions (`projection_exprs`).
+///
+/// Notes: Column indices in the projection expressions are based on the join 
schema,
+/// whereas the join on expressions are based on the join child schema. 
`column_index_offset`
+/// represents the offset between them.
 fn new_columns_for_join_on(
     hash_join_on: &[&PhysicalExprRef],
     projection_exprs: &[(Column, String)],
+    column_index_offset: usize,
 ) -> Option<Vec<PhysicalExprRef>> {
     let new_columns = hash_join_on
         .iter()
@@ -1130,6 +1137,8 @@ fn new_columns_for_join_on(
                             .enumerate()
                             .find(|(_, (proj_column, _))| {
                                 column.name() == proj_column.name()
+                                    && column.index() + column_index_offset
+                                        == proj_column.index()
                             })
                             .map(|(index, (_, alias))| Column::new(alias, 
index));
                         if let Some(new_column) = new_column {
@@ -1138,10 +1147,10 @@ fn new_columns_for_join_on(
                             // If the column is not found in the projection 
expressions,
                             // it means that the column is not projected. In 
this case,
                             // we cannot push the projection down.
-                            Err(DataFusionError::Internal(format!(
+                            internal_err!(
                                 "Column {:?} not found in projection 
expressions",
                                 column
-                            )))
+                            )
                         }
                     } else {
                         Ok(Transformed::no(expr))
@@ -1160,21 +1169,20 @@ fn update_join_filter(
     projection_left_exprs: &[(Column, String)],
     projection_right_exprs: &[(Column, String)],
     join_filter: &JoinFilter,
-    join_left: &Arc<dyn ExecutionPlan>,
-    join_right: &Arc<dyn ExecutionPlan>,
+    left_field_size: usize,
 ) -> Option<JoinFilter> {
     let mut new_left_indices = new_indices_for_join_filter(
         join_filter,
         JoinSide::Left,
         projection_left_exprs,
-        join_left.schema(),
+        0,
     )
     .into_iter();
     let mut new_right_indices = new_indices_for_join_filter(
         join_filter,
         JoinSide::Right,
         projection_right_exprs,
-        join_right.schema(),
+        left_field_size,
     )
     .into_iter();
 
@@ -1204,20 +1212,24 @@ fn update_join_filter(
 /// This function determines and returns a vector of indices representing the
 /// positions of columns in `projection_exprs` that are involved in 
`join_filter`,
 /// and correspond to a particular side (`join_side`) of the join operation.
+///
+/// Notes: Column indices in the projection expressions are based on the join 
schema,
+/// whereas the join filter is based on the join child schema. 
`column_index_offset`
+/// represents the offset between them.
 fn new_indices_for_join_filter(
     join_filter: &JoinFilter,
     join_side: JoinSide,
     projection_exprs: &[(Column, String)],
-    join_child_schema: SchemaRef,
+    column_index_offset: usize,
 ) -> Vec<usize> {
     join_filter
         .column_indices()
         .iter()
         .filter(|col_idx| col_idx.side == join_side)
         .filter_map(|col_idx| {
-            projection_exprs.iter().position(|(col, _)| {
-                col.name() == join_child_schema.fields()[col_idx.index].name()
-            })
+            projection_exprs
+                .iter()
+                .position(|(col, _)| col_idx.index + column_index_offset == 
col.index())
         })
         .collect()
 }
diff --git a/datafusion/sqllogictest/test_files/join.slt 
b/datafusion/sqllogictest/test_files/join.slt
index 3c89109145..12cb8b3985 100644
--- a/datafusion/sqllogictest/test_files/join.slt
+++ b/datafusion/sqllogictest/test_files/join.slt
@@ -986,3 +986,61 @@ DROP TABLE employees
 
 statement ok
 DROP TABLE department
+
+
+# Test issue: https://github.com/apache/datafusion/issues/11269
+statement ok
+CREATE TABLE t1 (v0 BIGINT) AS VALUES (-503661263);
+
+statement ok
+CREATE TABLE t2 (v0 DOUBLE) AS VALUES (-1.663563947387);
+
+statement ok
+CREATE TABLE t3 (v0 DOUBLE) AS VALUES (0.05112015193508901);
+
+query RR
+SELECT t3.v0, t2.v0 FROM t1,t2,t3 WHERE t3.v0 >= t1.v0;
+----
+0.051120151935 -1.663563947387
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;
+
+statement ok
+DROP TABLE t3;
+
+
+# Test issue: https://github.com/apache/datafusion/issues/11275
+statement ok
+CREATE TABLE t0 (v1 BOOLEAN) AS VALUES (false), (null);
+
+statement ok
+CREATE TABLE t1 (v1 BOOLEAN) AS VALUES (false), (null), (false);
+
+statement ok
+CREATE TABLE t2 (v1 BOOLEAN) AS VALUES (false), (true);
+
+query BB
+SELECT t2.v1, t1.v1 FROM t0, t1, t2 WHERE t2.v1 IS DISTINCT FROM t0.v1 ORDER 
BY 1,2;
+----
+false false
+false false
+false NULL
+true false
+true false
+true false
+true false
+true NULL
+true NULL
+
+statement ok
+DROP TABLE t0;
+
+statement ok
+DROP TABLE t1;
+
+statement ok
+DROP TABLE t2;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to