gene-bordegaray commented on code in PR #18521:
URL: https://github.com/apache/datafusion/pull/18521#discussion_r2507425235
##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -3269,22 +3262,17 @@ logical_plan
10)----------TableScan: annotated_data projection=[a0, a, b, c, d]
physical_plan
01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST,
rn1@11 ASC NULLS LAST]
-02)--SortExec: expr=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11
ASC NULLS LAST], preserve_partitioning=[true]
-03)----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
-04)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-08)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
-09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-10)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
file_type=csv, has_header=true
-11)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-12)--------CoalesceBatchesExec: target_batch_size=2
-13)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-14)------------RepartitionExec: partitioning=RoundRobinBatch(2),
input_partitions=1
-15)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED
FOLLOWING@5 as rn1]
-16)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-17)------------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
file_type=csv, has_header=true
+02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+03)----CoalesceBatchesExec: target_batch_size=2
+04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4
as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5
as rn1]
+06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+07)------------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a,
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST],
file_type=csv, has_header=true
+08)----CoalesceBatchesExec: target_batch_size=2
+09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
Review Comment:
I have recreated the query from join.slt and checked the results, here is
the command recreated:
```
-- These are settings that are set in join.slt that are set prior to the
query
SET datafusion.optimizer.prefer_hash_join = false;
SET datafusion.execution.target_partitions = 2;
SET datafusion.explain.format = 'indent';
CREATE EXTERNAL TABLE annotated_data (
a0 INTEGER,
a INTEGER,
b INTEGER,
c INTEGER,
d INTEGER
)
STORED AS CSV
WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
LOCATION 'datafusion/core/tests/data/window_2.csv'
OPTIONS ('format.has_header' 'true');
EXPLAIN SELECT *
FROM (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as l_table
JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as r_table
ON l_table.a = r_table.a
ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1;
SELECT *
FROM (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as l_table
JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
FROM annotated_data) as r_table
ON l_table.a = r_table.a
ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
LIMIT 30;
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST,
l_table.c ASC NULLS LAST, r_table.rn1 ASC NULLS LAST
|
| | Inner Join: l_table.a = r_table.a
|
| | SubqueryAlias: l_table
|
| | Projection: annotated_data.a0, annotated_data.a,
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
|
| | WindowAggr: windowExpr=[[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
|
| | TableScan: annotated_data projection=[a0, a, b,
c, d]
|
| | SubqueryAlias: r_table
|
| | Projection: annotated_data.a0, annotated_data.a,
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1
|
| | WindowAggr: windowExpr=[[row_number() ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
|
| | TableScan: annotated_data projection=[a0, a, b,
c, d]
|
| physical_plan | SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3
ASC NULLS LAST, rn1@11 ASC NULLS LAST]
|
| | SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
|
| | CoalesceBatchesExec: target_batch_size=2
|
| | RepartitionExec: partitioning=Hash([a@1], 2),
input_partitions=1
|
| | ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING@5 as rn1]
|
| | BoundedWindowAggExec: wdw=[row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] |
| | DataSourceExec: file_groups={1 group:
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3
ASC NULLS LAST], file_type=csv, has_header=true |
| | CoalesceBatchesExec: target_batch_size=2
|
| | RepartitionExec: partitioning=Hash([a@1], 2),
input_partitions=1
|
| | ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND
UNBOUNDED FOLLOWING@5 as rn1]
|
| | BoundedWindowAggExec: wdw=[row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted] |
| | DataSourceExec: file_groups={1 group:
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3
ASC NULLS LAST], file_type=csv, has_header=true |
| |
|
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.003 seconds.
+----+---+---+---+---+-----+----+---+---+----+---+-----+
| a0 | a | b | c | d | rn1 | a0 | a | b | c | d | rn1 |
+----+---+---+---+---+-----+----+---+---+----+---+-----+
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 0 | 0 | 1 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 1 | 2 | 2 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 2 | 0 | 3 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 3 | 0 | 4 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 4 | 1 | 5 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 5 | 1 | 6 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 6 | 0 | 7 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 7 | 2 | 8 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 8 | 1 | 9 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 9 | 4 | 10 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 10 | 4 | 11 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 11 | 2 | 12 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 12 | 2 | 13 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 13 | 1 | 14 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 14 | 2 | 15 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 15 | 3 | 16 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 16 | 3 | 17 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 17 | 2 | 18 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 18 | 1 | 19 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 19 | 4 | 20 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 20 | 0 | 21 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 21 | 3 | 22 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 22 | 0 | 23 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 23 | 0 | 24 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 0 | 24 | 4 | 25 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 25 | 0 | 26 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 26 | 2 | 27 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 27 | 0 | 28 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 28 | 1 | 29 |
| 1 | 0 | 0 | 0 | 0 | 1 | 1 | 0 | 1 | 29 | 1 | 30 |
+----+---+---+---+---+-----+----+---+---+----+---+-----+
30 row(s) fetched.
Elapsed 0.003 seconds.
```
1. The physical plan here has a Hash Repartition on the and no SortExec node
above. Despite this as seen the results are still sorted meaning that sorting
was preserved.
2. The metadata for when order is preserved is not shown as seen in the
above plan. The EXPLAIN output should display this but does not. This is
tracked in the field `maintains_input_order`. I can create an issue for this
and link it here
3. To recreate this I had to have: explicit ordering in the ORDER BY clause,
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]