gene-bordegaray commented on code in PR #18521:
URL: https://github.com/apache/datafusion/pull/18521#discussion_r2507425235


##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -3269,22 +3262,17 @@ logical_plan
 10)----------TableScan: annotated_data projection=[a0, a, b, c, d]
 physical_plan
 01)SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, 
rn1@11 ASC NULLS LAST]
-02)--SortExec: expr=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST, rn1@11 
ASC NULLS LAST], preserve_partitioning=[true]
-03)----SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
-04)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-05)--------CoalesceBatchesExec: target_batch_size=2
-06)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-07)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-08)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as 
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
-09)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-10)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
-11)------SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-12)--------CoalesceBatchesExec: target_batch_size=2
-13)----------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-14)------------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-15)--------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as 
c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
-16)----------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
-17)------------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
+03)----CoalesceBatchesExec: target_batch_size=2
+04)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1
+05)--------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 
as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 
as rn1]
+06)----------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED 
PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
+07)------------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
file_type=csv, has_header=true
+08)----CoalesceBatchesExec: target_batch_size=2
+09)------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=1

Review Comment:
   I have recreated the query from join.slt and checked the results here is how 
it can be recreated:
   
   ```
   -- These are settings that are set in join.slt that are set prior to the 
query
   SET datafusion.execution.batch_size = 2;
   SET datafusion.explain.logical_plan_only = false;
   SET datafusion.optimizer.prefer_hash_join = false;
   SET datafusion.execution.target_partitions = 2;
   SET datafusion.optimizer.prefer_existing_sort = false;
   SET datafusion.explain.format = 'indent';
   
   CREATE EXTERNAL TABLE annotated_data (
     a0 INTEGER,
     a INTEGER,
     b INTEGER,
     c INTEGER,
     d INTEGER
   )
   STORED AS CSV
   WITH ORDER (a ASC NULLS FIRST, b ASC, c ASC)
   LOCATION 'datafusion/core/tests/data/window_2.csv'
   OPTIONS ('format.has_header' 'true');
   
   EXPLAIN SELECT *
   FROM (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as l_table
   JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as r_table
   ON l_table.a = r_table.a
   ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1;
   
   SELECT *
   FROM (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as l_table
   JOIN (SELECT *, ROW_NUMBER() OVER() as rn1
         FROM annotated_data) as r_table
   ON l_table.a = r_table.a
   ORDER BY l_table.a ASC NULLS FIRST, l_table.b, l_table.c, r_table.rn1
   LIMIT 30;
   
   
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | plan_type     | plan                                                       
                                                                                
                                                                                
                                                               |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   | logical_plan  | Sort: l_table.a ASC NULLS FIRST, l_table.b ASC NULLS LAST, 
l_table.c ASC NULLS LAST, r_table.rn1 ASC NULLS LAST                            
                                                                                
                                                               |
   |               |   Inner Join: l_table.a = r_table.a                        
                                                                                
                                                                                
                                                               |
   |               |     SubqueryAlias: l_table                                 
                                                                                
                                                                                
                                                               |
   |               |       Projection: annotated_data.a0, annotated_data.a, 
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1                              
                                                                   |
   |               |         WindowAggr: windowExpr=[[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]                                   
                                                                                
                                                               |
   |               |           TableScan: annotated_data projection=[a0, a, b, 
c, d]                                                                           
                                                                                
                                                                |
   |               |     SubqueryAlias: r_table                                 
                                                                                
                                                                                
                                                               |
   |               |       Projection: annotated_data.a0, annotated_data.a, 
annotated_data.b, annotated_data.c, annotated_data.d, row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS rn1                              
                                                                   |
   |               |         WindowAggr: windowExpr=[[row_number() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]                                   
                                                                                
                                                               |
   |               |           TableScan: annotated_data projection=[a0, a, b, 
c, d]                                                                           
                                                                                
                                                                |
   | physical_plan | SortPreservingMergeExec: [a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST, rn1@11 ASC NULLS LAST]                                          
                                                                                
                                                               |
   |               |   SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]          
                                                                                
                                                                                
                                                               |
   |               |     CoalesceBatchesExec: target_batch_size=2               
                                                                                
                                                                                
                                                               |
   |               |       RepartitionExec: partitioning=Hash([a@1], 2), 
input_partitions=1                                                              
                                                                                
                                                                      |
   |               |         ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as 
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@5 as rn1]                                                   
                                                                      |
   |               |           BoundedWindowAggExec: wdw=[row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]    |
   |               |             DataSourceExec: file_groups={1 group: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
 projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST], file_type=csv, has_header=true |
   |               |     CoalesceBatchesExec: target_batch_size=2               
                                                                                
                                                                                
                                                               |
   |               |       RepartitionExec: partitioning=Hash([a@1], 2), 
input_partitions=1                                                              
                                                                                
                                                                      |
   |               |         ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as 
b, c@3 as c, d@4 as d, row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@5 as rn1]                                                   
                                                                      |
   |               |           BoundedWindowAggExec: wdw=[row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS 
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]    |
   |               |             DataSourceExec: file_groups={1 group: 
[[Users/gene.bordegaray/go/src/github.com/DataDog/datafusion/datafusion/core/tests/data/window_2.csv]]},
 projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 
ASC NULLS LAST], file_type=csv, has_header=true |
   |               |                                                            
                                                                                
                                                                                
                                                               |
   
+---------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
   2 row(s) fetched.
   Elapsed 0.003 seconds.
   
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   | a0 | a | b | c | d | rn1 | a0 | a | b | c  | d | rn1 |
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 0  | 0 | 1   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 1  | 2 | 2   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 2  | 0 | 3   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 3  | 0 | 4   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 4  | 1 | 5   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 5  | 1 | 6   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 6  | 0 | 7   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 7  | 2 | 8   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 8  | 1 | 9   |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 9  | 4 | 10  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 10 | 4 | 11  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 11 | 2 | 12  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 12 | 2 | 13  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 13 | 1 | 14  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 14 | 2 | 15  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 15 | 3 | 16  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 16 | 3 | 17  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 17 | 2 | 18  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 18 | 1 | 19  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 19 | 4 | 20  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 20 | 0 | 21  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 21 | 3 | 22  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 22 | 0 | 23  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 23 | 0 | 24  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 0 | 24 | 4 | 25  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 25 | 0 | 26  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 26 | 2 | 27  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 27 | 0 | 28  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 28 | 1 | 29  |
   | 1  | 0 | 0 | 0 | 0 | 1   | 1  | 0 | 1 | 29 | 1 | 30  |
   +----+---+---+---+---+-----+----+---+---+----+---+-----+
   30 row(s) fetched.
   Elapsed 0.003 seconds.
   ```
   
   The physical plan here has a Hash Repartition on the and no SortExec node 
above. Despite this as seen the results are still sorted meaning that sorting 
was preserved. 
   
   The metadata for when order is preserved is not shown as seen in the above 
plan. The EXPLAIN output should display this but does not. This is tracked in 
the field `maintains_input_order`. I can create an issue for this and link it 
here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to