This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ecc5694840 fix: FULL OUTER JOIN and LIMIT produces wrong results 
(#14338)
ecc5694840 is described below

commit ecc5694840249e1c18e9132a9833d00819749a45
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Jan 29 00:43:12 2025 +0800

    fix: FULL OUTER JOIN and LIMIT produces wrong results (#14338)
    
    * fix: FULL OUTER JOIN and LIMIT produces wrong results
    
    * Fix minor slt testing
    
    * fix test
---
 datafusion/optimizer/src/push_down_limit.rs  |   1 -
 datafusion/sqllogictest/test_files/joins.slt | 159 +++++++++++++++++++++++++--
 2 files changed, 147 insertions(+), 13 deletions(-)

diff --git a/datafusion/optimizer/src/push_down_limit.rs 
b/datafusion/optimizer/src/push_down_limit.rs
index 8a3aa4bb84..4da112d515 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) -> 
Transformed<Join> {
         match join.join_type {
             Left => (Some(limit), None),
             Right => (None, Some(limit)),
-            Full => (Some(limit), Some(limit)),
             _ => (None, None),
         }
     };
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index 496c6c609e..ac02aeb6fe 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 
LIMIT 2;
 logical_plan
 01)Limit: skip=0, fetch=2
 02)--Full Join: t0.c1 = t1.c1
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=3, fetch=2
 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
@@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2 
LIMIT 2;
 logical_plan
 01)Limit: skip=0, fetch=2
 02)--Full Join:  Filter: t0.c2 >= t1.c2
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
 physical_plan
 01)GlobalLimitExec: skip=0, fetch=2
 02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1
@@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1 
AND t0.c2 >= t1.c2 LIMIT
 logical_plan
 01)Limit: skip=0, fetch=2
 02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
 physical_plan
 01)CoalesceBatchesExec: target_batch_size=3, fetch=2
 02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)], 
filter=c2@0 >= c2@1
 03)----MemoryExec: partitions=1, partition_sizes=[1]
 04)----MemoryExec: partitions=1, partition_sizes=[1]
 
+## Add more test cases for join limit pushdown
+statement ok
+drop table t1
+
+## Test limit pushdown through OUTER JOIN including left/right and full outer 
join cases
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+### Limit pushdown through join
+
+# Note we use csv as MemoryExec does not support limit push down (so doesn't 
manifest
+# bugs if limits are improperly pushed down)
+query I
+COPY (values (1), (2), (3), (4), (5))  TO 'test_files/scratch/limit/t1.csv'
+STORED AS CSV
+----
+5
+
+# store t2 in different order so the top N rows are not the same as the top N 
rows of t1
+query I
+COPY (values (5), (4), (3), (2), (1))  TO 'test_files/scratch/limit/t2.csv'
+STORED AS CSV
+----
+5
+
+statement ok
+create external table t1(a int) stored as CSV location 
'test_files/scratch/limit/t1.csv';
+
+statement ok
+create external table t2(b int) stored as CSV location 
'test_files/scratch/limit/t2.csv';
+
+######
+## LEFT JOIN w/ LIMIT
+######
+query II
+select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+2 2
+1 1
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+query II
+select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+2 2
+1 1
+
+# can only push down to t1 (preserved side)
+query TT
+explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Left Join: t1.a = t2.b
+03)----Limit: skip=0, fetch=2
+04)------TableScan: t1 projection=[a], fetch=2
+05)----TableScan: t2 projection=[b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, 
projection=[a], limit=2, has_header=true
+04)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, 
projection=[b], has_header=true
+
+######
+## RIGHT JOIN w/ LIMIT
+######
+
+query II
+select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+query II
+select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+# can only push down to t2 (preserved side)
+query TT
+explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Right Join: t1.a = t2.b
+03)----TableScan: t1 projection=[a]
+04)----Limit: skip=0, fetch=2
+05)------TableScan: t2 projection=[b], fetch=2
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, 
projection=[a], has_header=true
+04)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, 
projection=[b], limit=2, has_header=true
+
+######
+## FULL JOIN w/ LIMIT
+######
+query II rowsort
+select * from t1 FULL JOIN t2 ON t1.a = t2.b;
+----
+1 1
+2 2
+3 3
+4 4
+5 5
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+# Reproducer for https://github.com/apache/datafusion/issues/14335
+query II
+select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+
+# can't push limit for full outer join
+query TT
+explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Full Join: t1.a = t2.b
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]}, 
projection=[a], has_header=true
+04)----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]}, 
projection=[b], has_header=true
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
+
 # Test Utf8View as Join Key
 # Issue: https://github.com/apache/datafusion/issues/12468
 statement ok


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to