This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d5f5cdf28 Adjust physical optimizer rule order (#15040)
6d5f5cdf28 is described below

commit 6d5f5cdf2889d1f2e5bcb708087b5e2a4b1da011
Author: xudong.w <[email protected]>
AuthorDate: Thu Mar 6 18:13:52 2025 +0700

    Adjust physical optimizer rule order (#15040)
---
 datafusion/physical-optimizer/src/optimizer.rs |  8 ++++----
 datafusion/sqllogictest/test_files/explain.slt |  6 +++---
 datafusion/sqllogictest/test_files/window.slt  | 26 ++++++++++++--------------
 3 files changed, 19 insertions(+), 21 deletions(-)

diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index 88f11f5349..bab31150e2 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -121,6 +121,10 @@ impl PhysicalOptimizer {
             // into an `order by max(x) limit y`. In this case it will copy 
the limit value down
             // to the aggregation, allowing it to use only y number of 
accumulators.
             Arc::new(TopKAggregation::new()),
+            // The LimitPushdown rule tries to push limits down as far as 
possible,
+            // replacing operators with fetching variants, or adding limits
+            // past operators that support limit pushdown.
+            Arc::new(LimitPushdown::new()),
             // The ProjectionPushdown rule tries to push projections towards
             // the sources in the execution plan. As a result of this process,
             // a projection can disappear if it reaches the source providers, 
and
@@ -128,10 +132,6 @@ impl PhysicalOptimizer {
             // are not present, the load of executors such as join or union 
will be
             // reduced by narrowing their input tables.
             Arc::new(ProjectionPushdown::new()),
-            // The LimitPushdown rule tries to push limits down as far as 
possible,
-            // replacing operators with fetching variants, or adding limits
-            // past operators that support limit pushdown.
-            Arc::new(LimitPushdown::new()),
             // The SanityCheckPlan rule checks whether the order and
             // distribution requirements of each node in the plan
             // is satisfied. It will also reject non-runnable query
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 16c61a1db6..d32ddd1512 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -247,8 +247,8 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after coalesce_batches SAME TEXT AS ABOVE
 physical_plan after OutputRequirements DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan_with_stats DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:)]]
@@ -323,8 +323,8 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8), 
Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan_with_schema DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, 
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, 
double_col:Float64;N, date_string_col:BinaryView;N, [...]
@@ -363,8 +363,8 @@ physical_plan after OutputRequirements
 01)GlobalLimitExec: skip=0, fetch=10
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan_with_stats DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
diff --git a/datafusion/sqllogictest/test_files/window.slt 
b/datafusion/sqllogictest/test_files/window.slt
index 61bb2f0227..1a9acc0f53 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2833,13 +2833,12 @@ logical_plan
 06)----------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
 07)------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
-01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
-02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(ann [...]
-03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
-07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
+01)ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(annot [...]
+02)--GlobalLimitExec: skip=0, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)),  [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NU [...]
+05)--------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as __common_expr_1, 
ts@0 as ts, inc_col@1 as inc_col]
+06)----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 query IIII
 SELECT
@@ -2879,13 +2878,12 @@ logical_plan
 06)----------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS 
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
 07)------------TableScan: annotated_data_infinite projection=[ts, inc_col]
 physical_plan
-01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, 
count2@3 as count2]
-02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(ann [...]
-03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as 
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
-07)------------StreamingTableExec: partition_sizes=1, projection=[ts, 
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
+01)ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY 
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING@6 as count1, count(annot [...]
+02)--GlobalLimitExec: skip=0, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING 
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)),  [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER 
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) 
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING 
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NU [...]
+05)--------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as __common_expr_1, 
ts@0 as ts, inc_col@1 as inc_col]
+06)----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], 
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
 
 
 query IIII


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to