gene-bordegaray commented on code in PR #18521:
URL: https://github.com/apache/datafusion/pull/18521#discussion_r2507425235


##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -3269,22 +3262,17 @@ logical_plan
 10)----------TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
 01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, 
rn1@11 ASC NULLS LAST]
-02)--SortExec: expr=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 
ASC NULLS LAST], preserve_partitioning=[true]
-03)----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
-04)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-08)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as 
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
-09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-10)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
-11)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-12)--------CoalesceBatchesExec: target_batch_size=2
-13)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-14)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-15)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as 
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
-16)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-17)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+03)----CoalesceBatchesExec: target_batch_size=2
+04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 
as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 
as rn1]
+06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+08)----CoalesceBatchesExec: target_batch_size=2
+09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1

Review Comment:
   I have recreated the query from join.slt and checked the results, here is 
the command recreated:
   
   ```
   -- These are settings that are set in join.slt that are set prior to the 
query
   SET datafusion.optimizer.prefer_hash_join = false;
   SET datafusion.explain.format = 'indent';
   
   CREATE EXTERNAL TABLE annotated_data (
     a0 INTEGER,
     a INTEGER,
     b INTEGER,
     c INTEGER,
     d INTEGER
   )
   STORED AS CSV
   WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
   LOCATION 'datafusion/core/tests/data/window_2.csv'
   OPTIONS ('format.has_header' 'true');
   
   EXPLAIN SELECT *
   FROM (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as l_table
   JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as r_table
   ON l_table.a = r_table.a
   ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1;
   
   SELECT *
   FROM (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as l_table
   JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as r_table
   ON l_table.a = r_table.a
   ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
   LIMIT 30;
   
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                               |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, 
l_table.c ASC NULLS LAST, r_table.rn1 ASC NULLS LAST                            
                                                                                
                                                               |
   |               |   Inner Join: l_table.a = r_table.a                        
                                                                                
                                                                                
                                                               |
   |               |     SubqueryAlias: l_table                                 
                                                                                
                                                                                
                                                               |
   |               |       Projection: annotated_data.a0, annotated_data.a, 
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1                              
                                                                   |
   |               |         WindowAggr: windowExpr=[[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]                                   
                                                                                
                                                               |
   |               |           TableScan: annotated_data projection=[a0, a, b, 
c, d]                                                                           
                                                                                
                                                                |
   |               |     SubqueryAlias: r_table                                 
                                                                                
                                                                                
                                                               |
   |               |       Projection: annotated_data.a0, annotated_data.a, 
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1                              
                                                                   |
   |               |         WindowAggr: windowExpr=[[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]                                   
                                                                                
                                                               |
   |               |           TableScan: annotated_data projection=[a0, a, b, 
c, d]                                                                           
                                                                                
                                                                |
   | physical_plan | SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST, rn1@11 ASC NULLS LAST]                                          
                                                                                
                                                               |
   |               |   SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]          
                                                                                
                                                                                
                                                               |
   |               |     CoalesceBatchesExec: target_batch_size=2               
                                                                                
                                                                                
                                                               |
   |               |       RepartitionExec: partitioning=Hash([a@1], 2), 
input_partitions=1                                                              
                                                                                
                                                                      |
   |               |         ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as 
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@5 as rn1]                                                   
                                                                      |
   |               |           BoundedWindowAggExec: wdw=[row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]    |
   |               |             DataSourceExec: file_groups={1 group: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
 projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST], file_type=csv, has_header=true |
   |               |     CoalesceBatchesExec: target_batch_size=2               
                                                                                
                                                                                
                                                               |
   |               |       RepartitionExec: partitioning=Hash([a@1], 2), 
input_partitions=1                                                              
                                                                                
                                                                      |
   |               |         ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as 
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@5 as rn1]                                                   
                                                                      |
   |               |           BoundedWindowAggExec: wdw=[row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]    |
   |               |             DataSourceExec: file_groups={1 group: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
 projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST], file_type=csv, has_header=true |
   |               |                                                            
                                                                                
                                                                                
                                                               |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.003 seconds.
   
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   | a0 | a | b | c | d | rn1 | a0 | a | b | c  | d | rn1 |
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 0  | 0 | 1   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 1  | 2 | 2   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 2  | 0 | 3   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 3  | 0 | 4   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 4  | 1 | 5   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 5  | 1 | 6   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 6  | 0 | 7   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 7  | 2 | 8   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 8  | 1 | 9   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 9  | 4 | 10  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 10 | 4 | 11  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 11 | 2 | 12  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 12 | 2 | 13  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 13 | 1 | 14  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 14 | 2 | 15  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 15 | 3 | 16  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 16 | 3 | 17  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 17 | 2 | 18  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 18 | 1 | 19  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 19 | 4 | 20  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 20 | 0 | 21  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 21 | 3 | 22  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 22 | 0 | 23  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 23 | 0 | 24  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 24 | 4 | 25  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 25 | 0 | 26  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 26 | 2 | 27  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 27 | 0 | 28  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 28 | 1 | 29  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 29 | 1 | 30  |
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   30 row(s) fetched.
   Elapsed 0.003 seconds.
   ```
   
   1. The physical plan here has a Hash Repartition on the and no SortExec node 
above. Despite this as seen the results are still sorted meaning that sorting 
was preserved. 
   2. The metadata for when order is preserved is not shown as seen in the 
above plan. The EXPLAIN output should display this but does not. This is 
tracked in the field `maintains_input_order`. I can create an issue for this 
and link it here
   3. To recreate this I had to have: explicit ordering in the `WITH ORDER` and 
`ORDER BY` clause and set the prefer_hash_join flag to false to force 
SortMergeJoin. This means with the default config if a user is using pre-sorted 
files they might miss speed ups by keeping unneeded `SortExec` nodes. Might be 
worth further discussion if there is a better way to handle this or open an 
issue to look further into this.
   
   Let me know what you think
   @2010YOUY01 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to