This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ecc5694840 fix: FULL OUTER JOIN and LIMIT produces wrong results
(#14338)
ecc5694840 is described below
commit ecc5694840249e1c18e9132a9833d00819749a45
Author: Qi Zhu <[email protected]>
AuthorDate: Wed Jan 29 00:43:12 2025 +0800
fix: FULL OUTER JOIN and LIMIT produces wrong results (#14338)
* fix: FULL OUTER JOIN and LIMIT produces wrong results
* Fix minor slt testing
* fix test
---
datafusion/optimizer/src/push_down_limit.rs | 1 -
datafusion/sqllogictest/test_files/joins.slt | 159 +++++++++++++++++++++++++--
2 files changed, 147 insertions(+), 13 deletions(-)
diff --git a/datafusion/optimizer/src/push_down_limit.rs
b/datafusion/optimizer/src/push_down_limit.rs
index 8a3aa4bb84..4da112d515 100644
--- a/datafusion/optimizer/src/push_down_limit.rs
+++ b/datafusion/optimizer/src/push_down_limit.rs
@@ -255,7 +255,6 @@ fn push_down_join(mut join: Join, limit: usize) ->
Transformed<Join> {
match join.join_type {
Left => (Some(limit), None),
Right => (None, Some(limit)),
- Full => (Some(limit), Some(limit)),
_ => (None, None),
}
};
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index 496c6c609e..ac02aeb6fe 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -4240,10 +4240,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1
LIMIT 2;
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t0.c1 = t1.c1
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)]
@@ -4257,10 +4255,8 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c2 >= t1.c2
LIMIT 2;
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: Filter: t0.c2 >= t1.c2
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)GlobalLimitExec: skip=0, fetch=2
02)--NestedLoopJoinExec: join_type=Full, filter=c2@0 >= c2@1
@@ -4274,16 +4270,155 @@ EXPLAIN SELECT * FROM t0 FULL JOIN t1 ON t0.c1 = t1.c1
AND t0.c2 >= t1.c2 LIMIT
logical_plan
01)Limit: skip=0, fetch=2
02)--Full Join: t0.c1 = t1.c1 Filter: t0.c2 >= t1.c2
-03)----Limit: skip=0, fetch=2
-04)------TableScan: t0 projection=[c1, c2], fetch=2
-05)----Limit: skip=0, fetch=2
-06)------TableScan: t1 projection=[c1, c2, c3], fetch=2
+03)----TableScan: t0 projection=[c1, c2]
+04)----TableScan: t1 projection=[c1, c2, c3]
physical_plan
01)CoalesceBatchesExec: target_batch_size=3, fetch=2
02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(c1@0, c1@0)],
filter=c2@0 >= c2@1
03)----MemoryExec: partitions=1, partition_sizes=[1]
04)----MemoryExec: partitions=1, partition_sizes=[1]
+## Add more test cases for join limit pushdown
+statement ok
+drop table t1
+
+## Test limit pushdown through OUTER JOIN including left/right and full outer
join cases
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+### Limit pushdown through join
+
+# Note we use csv as MemoryExec does not support limit push down (so doesn't
manifest
+# bugs if limits are improperly pushed down)
+query I
+COPY (values (1), (2), (3), (4), (5)) TO 'test_files/scratch/limit/t1.csv'
+STORED AS CSV
+----
+5
+
+# store t2 in different order so the top N rows are not the same as the top N
rows of t1
+query I
+COPY (values (5), (4), (3), (2), (1)) TO 'test_files/scratch/limit/t2.csv'
+STORED AS CSV
+----
+5
+
+statement ok
+create external table t1(a int) stored as CSV location
'test_files/scratch/limit/t1.csv';
+
+statement ok
+create external table t2(b int) stored as CSV location
'test_files/scratch/limit/t2.csv';
+
+######
+## LEFT JOIN w/ LIMIT
+######
+query II
+select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+2 2
+1 1
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+query II
+select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+2 2
+1 1
+
+# can only push down to t1 (preserved side)
+query TT
+explain select * from t1 LEFT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Left Join: t1.a = t2.b
+03)----Limit: skip=0, fetch=2
+04)------TableScan: t1 projection=[a], fetch=2
+05)----TableScan: t2 projection=[b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]},
projection=[a], limit=2, has_header=true
+04)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]},
projection=[b], has_header=true
+
+######
+## RIGHT JOIN w/ LIMIT
+######
+
+query II
+select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+query II
+select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+# can only push down to t2 (preserved side)
+query TT
+explain select * from t1 RIGHT JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Right Join: t1.a = t2.b
+03)----TableScan: t1 projection=[a]
+04)----Limit: skip=0, fetch=2
+05)------TableScan: t2 projection=[b], fetch=2
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Right, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]},
projection=[a], has_header=true
+04)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]},
projection=[b], limit=2, has_header=true
+
+######
+## FULL JOIN w/ LIMIT
+######
+query II rowsort
+select * from t1 FULL JOIN t2 ON t1.a = t2.b;
+----
+1 1
+2 2
+3 3
+4 4
+5 5
+
+# the output of this query should be two rows from the previous query
+# there should be no nulls
+# Reproducer for https://github.com/apache/datafusion/issues/14335
+query II
+select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+5 5
+4 4
+
+
+# can't push limit for full outer join
+query TT
+explain select * from t1 FULL JOIN t2 ON t1.a = t2.b LIMIT 2;
+----
+logical_plan
+01)Limit: skip=0, fetch=2
+02)--Full Join: t1.a = t2.b
+03)----TableScan: t1 projection=[a]
+04)----TableScan: t2 projection=[b]
+physical_plan
+01)CoalesceBatchesExec: target_batch_size=3, fetch=2
+02)--HashJoinExec: mode=CollectLeft, join_type=Full, on=[(a@0, b@0)]
+03)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t1.csv]]},
projection=[a], has_header=true
+04)----CsvExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit/t2.csv]]},
projection=[b], has_header=true
+
+statement ok
+drop table t1;
+
+statement ok
+drop table t2;
+
# Test Utf8View as Join Key
# Issue: https://github.com/apache/datafusion/issues/12468
statement ok
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]