This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 6d5f5cdf28 Adjust physical optimizer rule order (#15040)
6d5f5cdf28 is described below
commit 6d5f5cdf2889d1f2e5bcb708087b5e2a4b1da011
Author: xudong.w <[email protected]>
AuthorDate: Thu Mar 6 18:13:52 2025 +0700
Adjust physical optimizer rule order (#15040)
---
datafusion/physical-optimizer/src/optimizer.rs | 8 ++++----
datafusion/sqllogictest/test_files/explain.slt | 6 +++---
datafusion/sqllogictest/test_files/window.slt | 26 ++++++++++++--------------
3 files changed, 19 insertions(+), 21 deletions(-)
diff --git a/datafusion/physical-optimizer/src/optimizer.rs
b/datafusion/physical-optimizer/src/optimizer.rs
index 88f11f5349..bab31150e2 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -121,6 +121,10 @@ impl PhysicalOptimizer {
// into an `order by max(x) limit y`. In this case it will copy
the limit value down
// to the aggregation, allowing it to use only y number of
accumulators.
Arc::new(TopKAggregation::new()),
+ // The LimitPushdown rule tries to push limits down as far as
possible,
+ // replacing operators with fetching variants, or adding limits
+ // past operators that support limit pushdown.
+ Arc::new(LimitPushdown::new()),
// The ProjectionPushdown rule tries to push projections towards
// the sources in the execution plan. As a result of this process,
// a projection can disappear if it reaches the source providers,
and
@@ -128,10 +132,6 @@ impl PhysicalOptimizer {
// are not present, the load of executors such as join or union
will be
// reduced by narrowing their input tables.
Arc::new(ProjectionPushdown::new()),
- // The LimitPushdown rule tries to push limits down as far as
possible,
- // replacing operators with fetching variants, or adding limits
- // past operators that support limit pushdown.
- Arc::new(LimitPushdown::new()),
// The SanityCheckPlan rule checks whether the order and
// distribution requirements of each node in the plan
// is satisfied. It will also reject non-runnable query
diff --git a/datafusion/sqllogictest/test_files/explain.slt
b/datafusion/sqllogictest/test_files/explain.slt
index 16c61a1db6..d32ddd1512 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -247,8 +247,8 @@ physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after coalesce_batches SAME TEXT AS ABOVE
physical_plan after OutputRequirements DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown SAME TEXT AS ABOVE
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true
physical_plan_with_stats DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b,
c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent,
[(Col[0]:),(Col[1]:),(Col[2]:)]]
@@ -323,8 +323,8 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10, statistics=[Rows=Exact(8),
Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
physical_plan_with_schema DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N,
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N,
double_col:Float64;N, date_string_col:BinaryView;N, [...]
@@ -363,8 +363,8 @@ physical_plan after OutputRequirements
01)GlobalLimitExec: skip=0, fetch=10
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan after LimitAggregation SAME TEXT AS ABOVE
-physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after LimitPushdown DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
+physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet
physical_plan_with_stats DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]},
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col,
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10,
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671),
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
diff --git a/datafusion/sqllogictest/test_files/window.slt
b/datafusion/sqllogictest/test_files/window.slt
index 61bb2f0227..1a9acc0f53 100644
--- a/datafusion/sqllogictest/test_files/window.slt
+++ b/datafusion/sqllogictest/test_files/window.slt
@@ -2833,13 +2833,12 @@ logical_plan
06)----------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
07)------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
-01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
-02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(ann [...]
-03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
-07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
+01)ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(annot [...]
+02)--GlobalLimitExec: skip=0, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NU [...]
+05)--------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as __common_expr_1,
ts@0 as ts, inc_col@1 as inc_col]
+06)----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col],
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
SELECT
@@ -2879,13 +2878,12 @@ logical_plan
06)----------Projection: CAST(annotated_data_infinite.inc_col AS Int64) AS
__common_expr_1, annotated_data_infinite.ts, annotated_data_infinite.inc_col
07)------------TableScan: annotated_data_infinite projection=[ts, inc_col]
physical_plan
-01)ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1,
count2@3 as count2]
-02)--ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(ann [...]
-03)----GlobalLimitExec: skip=0, fetch=5
-04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)) [...]
-05)--------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64( [...]
-06)----------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as
__common_expr_1, ts@0 as ts, inc_col@1 as inc_col]
-07)------------StreamingTableExec: partition_sizes=1, projection=[ts,
inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
+01)ProjectionExec: expr=[sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@5 as sum1, sum(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING@3 as sum2, count(annotated_data_infinite.inc_col) ORDER BY
[annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING@6 as count1, count(annot [...]
+02)--GlobalLimitExec: skip=0, fetch=5
+03)----BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING
AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NULL)), [...]
+04)------BoundedWindowAggExec: wdw=[sum(annotated_data_infinite.inc_col) ORDER
BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND
UNBOUNDED FOLLOWING: Ok(Field { name: "sum(annotated_data_infinite.inc_col)
ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING
AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0,
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows,
start_bound: Preceding(UInt64(NU [...]
+05)--------ProjectionExec: expr=[CAST(inc_col@1 AS Int64) as __common_expr_1,
ts@0 as ts, inc_col@1 as inc_col]
+06)----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col],
infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST]
query IIII
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]