This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 060e67e8ff cleanup(tests): Move tests from `push_down_projections.rs`
to `optimize_projections.rs` (#10071)
060e67e8ff is described below
commit 060e67e8ff3b57ab695daeb28cf7175c4e2c568c
Author: Kaviraj Kanagaraj <[email protected]>
AuthorDate: Mon Apr 15 13:40:19 2024 +0200
cleanup(tests): Move tests from `push_down_projections.rs` to
`optimize_projections.rs` (#10071)
* cleanup(tests): Move tests from `push_down_projections.rs` to
`optimize_projections.rs`
Fixes: #9978
The PR #8340 removed `push_down_projections.rs` in favour of
`optimize_projections.rs`. This PR moves the tests as well.
Signed-off-by: Kaviraj <[email protected]>
* remove the file `push_down_projection.rs`
Signed-off-by: Kaviraj <[email protected]>
* Fix method signatures that are broken by other PRs
Signed-off-by: Kaviraj <[email protected]>
* remove `push_down_projections.rs` from `lib.rs`
Signed-off-by: Kaviraj <[email protected]>
---------
Signed-off-by: Kaviraj <[email protected]>
---
datafusion/optimizer/src/lib.rs | 1 -
datafusion/optimizer/src/optimize_projections.rs | 637 +++++++++++++++++++++-
datafusion/optimizer/src/push_down_projection.rs | 660 -----------------------
3 files changed, 631 insertions(+), 667 deletions(-)
diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs
index f1f49727c3..9176d67c1d 100644
--- a/datafusion/optimizer/src/lib.rs
+++ b/datafusion/optimizer/src/lib.rs
@@ -47,7 +47,6 @@ pub mod optimizer;
pub mod propagate_empty_relation;
pub mod push_down_filter;
pub mod push_down_limit;
-pub mod push_down_projection;
pub mod replace_distinct_aggregate;
pub mod rewrite_disjunctive_predicate;
pub mod scalar_subquery_to_join;
diff --git a/datafusion/optimizer/src/optimize_projections.rs
b/datafusion/optimizer/src/optimize_projections.rs
index 6967b28f30..b54fb248a7 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -925,20 +925,32 @@ fn is_projection_unnecessary(input: &LogicalPlan,
proj_exprs: &[Expr]) -> Result
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
use std::fmt::Formatter;
use std::sync::Arc;
+ use std::vec;
use crate::optimize_projections::OptimizeProjections;
+ use crate::optimizer::Optimizer;
use crate::test::{
- assert_optimized_plan_eq, test_table_scan, test_table_scan_with_name,
+ assert_fields_eq, assert_optimized_plan_eq, scan_empty,
test_table_scan,
+ test_table_scan_fields, test_table_scan_with_name,
};
+ use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_common::{Column, DFSchemaRef, JoinType, Result,
TableReference};
+ use datafusion_common::{
+ Column, DFSchema, DFSchemaRef, JoinType, Result, TableReference,
+ };
use datafusion_expr::{
- binary_expr, build_join_schema, col, count, lit,
- logical_plan::builder::LogicalPlanBuilder, not, table_scan, try_cast,
when,
- BinaryExpr, Expr, Extension, Like, LogicalPlan, Operator,
- UserDefinedLogicalNodeCore,
+ binary_expr, build_join_schema,
+ builder::table_scan_with_filters,
+ col, count,
+ expr::{self, Cast},
+ lit,
+ logical_plan::{builder::LogicalPlanBuilder, table_scan},
+ max, min, not, try_cast, when, AggregateFunction, BinaryExpr, Expr,
Extension,
+ Like, LogicalPlan, Operator, Projection, UserDefinedLogicalNodeCore,
WindowFrame,
+ WindowFunctionDefinition,
};
fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) ->
Result<()> {
@@ -1466,4 +1478,617 @@ mod tests {
\n TableScan: r projection=[a]";
assert_optimized_plan_equal(plan, expected)
}
+
+ #[test]
+ fn aggregate_no_group_by() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\
+ \n TableScan: test projection=[b]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn aggregate_group_by() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("c")], vec![max(col("b"))])?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[test.c]], aggr=[[MAX(test.b)]]\
+ \n TableScan: test projection=[b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn aggregate_group_by_with_table_alias() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .alias("a")?
+ .aggregate(vec![col("c")], vec![max(col("b"))])?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[a.c]], aggr=[[MAX(a.b)]]\
+ \n SubqueryAlias: a\
+ \n TableScan: test projection=[b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn aggregate_no_group_by_with_filter() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .filter(col("c").gt(lit(1)))?
+ .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\
+ \n Projection: test.b\
+ \n Filter: test.c > Int32(1)\
+ \n TableScan: test projection=[b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn aggregate_with_periods() -> Result<()> {
+ let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8,
false)]);
+
+ // Build a plan that looks as follows (note "tag.one" is a column named
+ // "tag.one", not a column named "one" in a table named "tag"):
+ //
+ // Projection: tag.one
+ // Aggregate: groupBy=[], aggr=[MAX("tag.one") AS "tag.one"]
+ // TableScan
+ let plan = table_scan(Some("m4"), &schema, None)?
+ .aggregate(
+ Vec::<Expr>::new(),
+
vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
+ )?
+ .project([col(Column::new_unqualified("tag.one"))])?
+ .build()?;
+
+ let expected = "\
+ Aggregate: groupBy=[[]], aggr=[[MAX(m4.tag.one) AS tag.one]]\
+ \n TableScan: m4 projection=[tag.one]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn redundant_project() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a"), col("b"), col("c")])?
+ .project(vec![col("a"), col("c"), col("b")])?
+ .build()?;
+ let expected = "Projection: test.a, test.c, test.b\
+ \n TableScan: test projection=[a, b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn reorder_scan() -> Result<()> {
+ let schema = Schema::new(test_table_scan_fields());
+
+ let plan = table_scan(Some("test"), &schema, Some(vec![1, 0,
2]))?.build()?;
+ let expected = "TableScan: test projection=[b, a, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn reorder_scan_projection() -> Result<()> {
+ let schema = Schema::new(test_table_scan_fields());
+
+ let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+ let expected = "Projection: test.a, test.b\
+ \n TableScan: test projection=[b, a]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn reorder_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c"), col("b"), col("a")])?
+ .build()?;
+ let expected = "Projection: test.c, test.b, test.a\
+ \n TableScan: test projection=[a, b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn noncontinuous_redundant_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c"), col("b"), col("a")])?
+ .filter(col("c").gt(lit(1)))?
+ .project(vec![col("c"), col("a"), col("b")])?
+ .filter(col("b").gt(lit(1)))?
+ .filter(col("a").gt(lit(1)))?
+ .project(vec![col("a"), col("c"), col("b")])?
+ .build()?;
+ let expected = "Projection: test.a, test.c, test.b\
+ \n Filter: test.a > Int32(1)\
+ \n Filter: test.b > Int32(1)\
+ \n Projection: test.c, test.a, test.b\
+ \n Filter: test.c > Int32(1)\
+ \n Projection: test.c, test.b, test.a\
+ \n TableScan: test projection=[a, b, c]";
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn join_schema_trim_full_join_column_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let schema = Schema::new(vec![Field::new("c1", DataType::UInt32,
false)]);
+ let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
+ .project(vec![col("a"), col("b"), col("c1")])?
+ .build()?;
+
+ // make sure projections are pushed down to both table scans
+ let expected = "Left Join: test.a = test2.c1\
+ \n TableScan: test projection=[a, b]\
+ \n TableScan: test2 projection=[c1]";
+
+ let optimized_plan = optimize(plan)?;
+ let formatted_plan = format!("{optimized_plan:?}");
+ assert_eq!(formatted_plan, expected);
+
+ // make sure schema for join node include both join columns
+ let optimized_join = optimized_plan;
+ assert_eq!(
+ **optimized_join.schema(),
+ DFSchema::new_with_metadata(
+ vec![
+ (
+ Some("test".into()),
+ Arc::new(Field::new("a", DataType::UInt32, false))
+ ),
+ (
+ Some("test".into()),
+ Arc::new(Field::new("b", DataType::UInt32, false))
+ ),
+ (
+ Some("test2".into()),
+ Arc::new(Field::new("c1", DataType::UInt32, true))
+ ),
+ ],
+ HashMap::new()
+ )?,
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn join_schema_trim_partial_join_column_projection() -> Result<()> {
+ // test join column push down without explicit column projections
+
+ let table_scan = test_table_scan()?;
+
+ let schema = Schema::new(vec![Field::new("c1", DataType::UInt32,
false)]);
+ let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
+ // projecting joined column `a` should push the right side column
`c1` projection as
+ // well into test2 table even though `c1` is not referenced in
projection.
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+
+ // make sure projections are pushed down to both table scans
+ let expected = "Projection: test.a, test.b\
+ \n Left Join: test.a = test2.c1\
+ \n TableScan: test projection=[a, b]\
+ \n TableScan: test2 projection=[c1]";
+
+ let optimized_plan = optimize(plan)?;
+ let formatted_plan = format!("{optimized_plan:?}");
+ assert_eq!(formatted_plan, expected);
+
+ // make sure schema for join node include both join columns
+ let optimized_join = optimized_plan.inputs()[0];
+ assert_eq!(
+ **optimized_join.schema(),
+ DFSchema::new_with_metadata(
+ vec![
+ (
+ Some("test".into()),
+ Arc::new(Field::new("a", DataType::UInt32, false))
+ ),
+ (
+ Some("test".into()),
+ Arc::new(Field::new("b", DataType::UInt32, false))
+ ),
+ (
+ Some("test2".into()),
+ Arc::new(Field::new("c1", DataType::UInt32, true))
+ ),
+ ],
+ HashMap::new()
+ )?,
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn join_schema_trim_using_join() -> Result<()> {
+ // shared join columns from using join should be pushed to both sides
+
+ let table_scan = test_table_scan()?;
+
+ let schema = Schema::new(vec![Field::new("a", DataType::UInt32,
false)]);
+ let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .join_using(table2_scan, JoinType::Left, vec!["a"])?
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+
+ // make sure projections are pushed down to table scan
+ let expected = "Projection: test.a, test.b\
+ \n Left Join: Using test.a = test2.a\
+ \n TableScan: test projection=[a, b]\
+ \n TableScan: test2 projection=[a]";
+
+ let optimized_plan = optimize(plan)?;
+ let formatted_plan = format!("{optimized_plan:?}");
+ assert_eq!(formatted_plan, expected);
+
+ // make sure schema for join node include both join columns
+ let optimized_join = optimized_plan.inputs()[0];
+ assert_eq!(
+ **optimized_join.schema(),
+ DFSchema::new_with_metadata(
+ vec![
+ (
+ Some("test".into()),
+ Arc::new(Field::new("a", DataType::UInt32, false))
+ ),
+ (
+ Some("test".into()),
+ Arc::new(Field::new("b", DataType::UInt32, false))
+ ),
+ (
+ Some("test2".into()),
+ Arc::new(Field::new("a", DataType::UInt32, true))
+ ),
+ ],
+ HashMap::new()
+ )?,
+ );
+
+ Ok(())
+ }
+
+ #[test]
+ fn cast() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let projection = LogicalPlanBuilder::from(table_scan)
+ .project(vec![Expr::Cast(Cast::new(
+ Box::new(col("c")),
+ DataType::Float64,
+ ))])?
+ .build()?;
+
+ let expected = "Projection: CAST(test.c AS Float64)\
+ \n TableScan: test projection=[c]";
+
+ assert_optimized_plan_equal(projection, expected)
+ }
+
+ #[test]
+ fn table_scan_projected_schema() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(test_table_scan()?)
+ .project(vec![col("a"), col("b")])?
+ .build()?;
+
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+ assert_fields_eq(&plan, vec!["a", "b"]);
+
+ let expected = "TableScan: test projection=[a, b]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let input_schema = table_scan.schema();
+ assert_eq!(3, input_schema.fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ // Build the LogicalPlan directly (don't use PlanBuilder), so
+ // that the Column references are unqualified (e.g. their
+ // relation is `None`). PlanBuilder resolves the expressions
+ let expr = vec![col("test.a"), col("test.b")];
+ let plan =
+ LogicalPlan::Projection(Projection::try_new(expr,
Arc::new(table_scan))?);
+
+ assert_fields_eq(&plan, vec!["a", "b"]);
+
+ let expected = "TableScan: test projection=[a, b]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn table_limit() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c"), col("a")])?
+ .limit(0, Some(5))?
+ .build()?;
+
+ assert_fields_eq(&plan, vec!["c", "a"]);
+
+ let expected = "Limit: skip=0, fetch=5\
+ \n Projection: test.c, test.a\
+ \n TableScan: test projection=[a, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn table_scan_without_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan).build()?;
+ // should expand projection to all columns without projection
+ let expected = "TableScan: test projection=[a, b, c]";
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn table_scan_with_literal_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![lit(1_i64), lit(2_i64)])?
+ .build()?;
+ let expected = "Projection: Int64(1), Int64(2)\
+ \n TableScan: test projection=[]";
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ /// tests that it removes unused columns in projections
+ #[test]
+ fn table_unused_column() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ // we never use "b" in the first projection => remove it
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("c"), col("a"), col("b")])?
+ .filter(col("c").gt(lit(1)))?
+ .aggregate(vec![col("c")], vec![max(col("a"))])?
+ .build()?;
+
+ assert_fields_eq(&plan, vec!["c", "MAX(test.a)"]);
+
+ let plan = optimize(plan).expect("failed to optimize plan");
+ let expected = "\
+ Aggregate: groupBy=[[test.c]], aggr=[[MAX(test.a)]]\
+ \n Filter: test.c > Int32(1)\
+ \n Projection: test.c, test.a\
+ \n TableScan: test projection=[a, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ /// tests that it removes un-needed projections
+ #[test]
+ fn table_unused_projection() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ // there is no need for the first projection
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("b")])?
+ .project(vec![lit(1).alias("a")])?
+ .build()?;
+
+ assert_fields_eq(&plan, vec!["a"]);
+
+ let expected = "\
+ Projection: Int32(1) AS a\
+ \n TableScan: test projection=[]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn table_full_filter_pushdown() -> Result<()> {
+ let schema = Schema::new(test_table_scan_fields());
+
+ let table_scan = table_scan_with_filters(
+ Some("test"),
+ &schema,
+ None,
+ vec![col("b").eq(lit(1))],
+ )?
+ .build()?;
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ // there is no need for the first projection
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("b")])?
+ .project(vec![lit(1).alias("a")])?
+ .build()?;
+
+ assert_fields_eq(&plan, vec!["a"]);
+
+ let expected = "\
+ Projection: Int32(1) AS a\
+ \n TableScan: test projection=[], full_filters=[b = Int32(1)]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ /// tests that optimizing twice yields same plan
+ #[test]
+ fn test_double_optimization() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("b")])?
+ .project(vec![lit(1).alias("a")])?
+ .build()?;
+
+ let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
+ let optimized_plan2 =
+ optimize(optimized_plan1.clone()).expect("failed to optimize
plan");
+
+ let formatted_plan1 = format!("{optimized_plan1:?}");
+ let formatted_plan2 = format!("{optimized_plan2:?}");
+ assert_eq!(formatted_plan1, formatted_plan2);
+ Ok(())
+ }
+
+ /// tests that it removes an aggregate is never used downstream
+ #[test]
+ fn table_unused_aggregate() -> Result<()> {
+ let table_scan = test_table_scan()?;
+ assert_eq!(3, table_scan.schema().fields().len());
+ assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
+
+ // we never use "min(b)" => remove it
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(vec![col("a"), col("c")], vec![max(col("b")),
min(col("b"))])?
+ .filter(col("c").gt(lit(1)))?
+ .project(vec![col("c"), col("a"), col("MAX(test.b)")])?
+ .build()?;
+
+ assert_fields_eq(&plan, vec!["c", "a", "MAX(test.b)"]);
+
+ let expected = "Projection: test.c, test.a, MAX(test.b)\
+ \n Filter: test.c > Int32(1)\
+ \n Aggregate: groupBy=[[test.a, test.c]], aggr=[[MAX(test.b)]]\
+ \n TableScan: test projection=[a, b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn aggregate_filter_pushdown() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let aggr_with_filter =
Expr::AggregateFunction(expr::AggregateFunction::new(
+ AggregateFunction::Count,
+ vec![col("b")],
+ false,
+ Some(Box::new(col("c").gt(lit(42)))),
+ None,
+ None,
+ ));
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .aggregate(
+ vec![col("a")],
+ vec![count(col("b")), aggr_with_filter.alias("count2")],
+ )?
+ .build()?;
+
+ let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b),
COUNT(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
+ \n TableScan: test projection=[a, b, c]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn pushdown_through_distinct() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .project(vec![col("a"), col("b")])?
+ .distinct()?
+ .project(vec![col("a")])?
+ .build()?;
+
+ let expected = "Projection: test.a\
+ \n Distinct:\
+ \n TableScan: test projection=[a, b]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ #[test]
+ fn test_window() -> Result<()> {
+ let table_scan = test_table_scan()?;
+
+ let max1 = Expr::WindowFunction(expr::WindowFunction::new(
+
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
+ vec![col("test.a")],
+ vec![col("test.b")],
+ vec![],
+ WindowFrame::new(None),
+ None,
+ ));
+
+ let max2 = Expr::WindowFunction(expr::WindowFunction::new(
+
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
+ vec![col("test.b")],
+ vec![],
+ vec![],
+ WindowFrame::new(None),
+ None,
+ ));
+ let col1 = col(max1.display_name()?);
+ let col2 = col(max2.display_name()?);
+
+ let plan = LogicalPlanBuilder::from(table_scan)
+ .window(vec![max1])?
+ .window(vec![max2])?
+ .project(vec![col1, col2])?
+ .build()?;
+
+ let expected = "Projection: MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MAX(test.b) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(test.b) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n Projection: test.b, MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
+ \n WindowAggr: windowExpr=[[MAX(test.a) PARTITION BY [test.b]
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
+ \n TableScan: test projection=[a, b]";
+
+ assert_optimized_plan_equal(plan, expected)
+ }
+
+ fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
+
+ fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
+ let optimizer =
Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
+ let optimized_plan =
+ optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
+ Ok(optimized_plan)
+ }
}
diff --git a/datafusion/optimizer/src/push_down_projection.rs
b/datafusion/optimizer/src/push_down_projection.rs
deleted file mode 100644
index 2f578094b3..0000000000
--- a/datafusion/optimizer/src/push_down_projection.rs
+++ /dev/null
@@ -1,660 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#[cfg(test)]
-mod tests {
- use std::collections::HashMap;
- use std::sync::Arc;
- use std::vec;
-
- use crate::optimize_projections::OptimizeProjections;
- use crate::optimizer::Optimizer;
- use crate::test::*;
- use crate::{OptimizerContext, OptimizerRule};
- use arrow::datatypes::{DataType, Field, Schema};
- use datafusion_common::{Column, DFSchema, Result};
- use datafusion_expr::builder::table_scan_with_filters;
- use datafusion_expr::expr::{self, Cast};
- use datafusion_expr::logical_plan::{
- builder::LogicalPlanBuilder, table_scan, JoinType,
- };
- use datafusion_expr::{
- col, count, lit, max, min, AggregateFunction, Expr, LogicalPlan,
Projection,
- WindowFrame, WindowFunctionDefinition,
- };
-
- #[test]
- fn aggregate_no_group_by() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
- .build()?;
-
- let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\
- \n TableScan: test projection=[b]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn aggregate_group_by() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("c")], vec![max(col("b"))])?
- .build()?;
-
- let expected = "Aggregate: groupBy=[[test.c]], aggr=[[MAX(test.b)]]\
- \n TableScan: test projection=[b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn aggregate_group_by_with_table_alias() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .alias("a")?
- .aggregate(vec![col("c")], vec![max(col("b"))])?
- .build()?;
-
- let expected = "Aggregate: groupBy=[[a.c]], aggr=[[MAX(a.b)]]\
- \n SubqueryAlias: a\
- \n TableScan: test projection=[b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn aggregate_no_group_by_with_filter() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .filter(col("c").gt(lit(1)))?
- .aggregate(Vec::<Expr>::new(), vec![max(col("b"))])?
- .build()?;
-
- let expected = "Aggregate: groupBy=[[]], aggr=[[MAX(test.b)]]\
- \n Projection: test.b\
- \n Filter: test.c > Int32(1)\
- \n TableScan: test projection=[b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn aggregate_with_periods() -> Result<()> {
- let schema = Schema::new(vec![Field::new("tag.one", DataType::Utf8,
false)]);
-
- // Build a plan that looks as follows (note "tag.one" is a column named
- // "tag.one", not a column named "one" in a table named "tag"):
- //
- // Projection: tag.one
- // Aggregate: groupBy=[], aggr=[MAX("tag.one") AS "tag.one"]
- // TableScan
- let plan = table_scan(Some("m4"), &schema, None)?
- .aggregate(
- Vec::<Expr>::new(),
-
vec![max(col(Column::new_unqualified("tag.one"))).alias("tag.one")],
- )?
- .project([col(Column::new_unqualified("tag.one"))])?
- .build()?;
-
- let expected = "\
- Aggregate: groupBy=[[]], aggr=[[MAX(m4.tag.one) AS tag.one]]\
- \n TableScan: m4 projection=[tag.one]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn redundant_project() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("a"), col("b"), col("c")])?
- .project(vec![col("a"), col("c"), col("b")])?
- .build()?;
- let expected = "Projection: test.a, test.c, test.b\
- \n TableScan: test projection=[a, b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn reorder_scan() -> Result<()> {
- let schema = Schema::new(test_table_scan_fields());
-
- let plan = table_scan(Some("test"), &schema, Some(vec![1, 0,
2]))?.build()?;
- let expected = "TableScan: test projection=[b, a, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn reorder_scan_projection() -> Result<()> {
- let schema = Schema::new(test_table_scan_fields());
-
- let plan = table_scan(Some("test"), &schema, Some(vec![1, 0, 2]))?
- .project(vec![col("a"), col("b")])?
- .build()?;
- let expected = "Projection: test.a, test.b\
- \n TableScan: test projection=[b, a]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn reorder_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("c"), col("b"), col("a")])?
- .build()?;
- let expected = "Projection: test.c, test.b, test.a\
- \n TableScan: test projection=[a, b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn noncontinuous_redundant_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("c"), col("b"), col("a")])?
- .filter(col("c").gt(lit(1)))?
- .project(vec![col("c"), col("a"), col("b")])?
- .filter(col("b").gt(lit(1)))?
- .filter(col("a").gt(lit(1)))?
- .project(vec![col("a"), col("c"), col("b")])?
- .build()?;
- let expected = "Projection: test.a, test.c, test.b\
- \n Filter: test.a > Int32(1)\
- \n Filter: test.b > Int32(1)\
- \n Projection: test.c, test.a, test.b\
- \n Filter: test.c > Int32(1)\
- \n Projection: test.c, test.b, test.a\
- \n TableScan: test projection=[a, b, c]";
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn join_schema_trim_full_join_column_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let schema = Schema::new(vec![Field::new("c1", DataType::UInt32,
false)]);
- let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
- .project(vec![col("a"), col("b"), col("c1")])?
- .build()?;
-
- // make sure projections are pushed down to both table scans
- let expected = "Left Join: test.a = test2.c1\
- \n TableScan: test projection=[a, b]\
- \n TableScan: test2 projection=[c1]";
-
- let optimized_plan = optimize(plan)?;
- let formatted_plan = format!("{optimized_plan:?}");
- assert_eq!(formatted_plan, expected);
-
- // make sure schema for join node include both join columns
- let optimized_join = optimized_plan;
- assert_eq!(
- **optimized_join.schema(),
- DFSchema::new_with_metadata(
- vec![
- (
- Some("test".into()),
- Arc::new(Field::new("a", DataType::UInt32, false))
- ),
- (
- Some("test".into()),
- Arc::new(Field::new("b", DataType::UInt32, false))
- ),
- (
- Some("test2".into()),
- Arc::new(Field::new("c1", DataType::UInt32, true))
- ),
- ],
- HashMap::new()
- )?,
- );
-
- Ok(())
- }
-
- #[test]
- fn join_schema_trim_partial_join_column_projection() -> Result<()> {
- // test join column push down without explicit column projections
-
- let table_scan = test_table_scan()?;
-
- let schema = Schema::new(vec![Field::new("c1", DataType::UInt32,
false)]);
- let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .join(table2_scan, JoinType::Left, (vec!["a"], vec!["c1"]), None)?
- // projecting joined column `a` should push the right side column
`c1` projection as
- // well into test2 table even though `c1` is not referenced in
projection.
- .project(vec![col("a"), col("b")])?
- .build()?;
-
- // make sure projections are pushed down to both table scans
- let expected = "Projection: test.a, test.b\
- \n Left Join: test.a = test2.c1\
- \n TableScan: test projection=[a, b]\
- \n TableScan: test2 projection=[c1]";
-
- let optimized_plan = optimize(plan)?;
- let formatted_plan = format!("{optimized_plan:?}");
- assert_eq!(formatted_plan, expected);
-
- // make sure schema for join node include both join columns
- let optimized_join = optimized_plan.inputs()[0];
- assert_eq!(
- **optimized_join.schema(),
- DFSchema::new_with_metadata(
- vec![
- (
- Some("test".into()),
- Arc::new(Field::new("a", DataType::UInt32, false))
- ),
- (
- Some("test".into()),
- Arc::new(Field::new("b", DataType::UInt32, false))
- ),
- (
- Some("test2".into()),
- Arc::new(Field::new("c1", DataType::UInt32, true))
- ),
- ],
- HashMap::new()
- )?,
- );
-
- Ok(())
- }
-
- #[test]
- fn join_schema_trim_using_join() -> Result<()> {
- // shared join columns from using join should be pushed to both sides
-
- let table_scan = test_table_scan()?;
-
- let schema = Schema::new(vec![Field::new("a", DataType::UInt32,
false)]);
- let table2_scan = scan_empty(Some("test2"), &schema, None)?.build()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .join_using(table2_scan, JoinType::Left, vec!["a"])?
- .project(vec![col("a"), col("b")])?
- .build()?;
-
- // make sure projections are pushed down to table scan
- let expected = "Projection: test.a, test.b\
- \n Left Join: Using test.a = test2.a\
- \n TableScan: test projection=[a, b]\
- \n TableScan: test2 projection=[a]";
-
- let optimized_plan = optimize(plan)?;
- let formatted_plan = format!("{optimized_plan:?}");
- assert_eq!(formatted_plan, expected);
-
- // make sure schema for join node include both join columns
- let optimized_join = optimized_plan.inputs()[0];
- assert_eq!(
- **optimized_join.schema(),
- DFSchema::new_with_metadata(
- vec![
- (
- Some("test".into()),
- Arc::new(Field::new("a", DataType::UInt32, false))
- ),
- (
- Some("test".into()),
- Arc::new(Field::new("b", DataType::UInt32, false))
- ),
- (
- Some("test2".into()),
- Arc::new(Field::new("a", DataType::UInt32, true))
- ),
- ],
- HashMap::new()
- )?,
- );
-
- Ok(())
- }
-
- #[test]
- fn cast() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let projection = LogicalPlanBuilder::from(table_scan)
- .project(vec![Expr::Cast(Cast::new(
- Box::new(col("c")),
- DataType::Float64,
- ))])?
- .build()?;
-
- let expected = "Projection: CAST(test.c AS Float64)\
- \n TableScan: test projection=[c]";
-
- assert_optimized_plan_eq(projection, expected)
- }
-
- #[test]
- fn table_scan_projected_schema() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(test_table_scan()?)
- .project(vec![col("a"), col("b")])?
- .build()?;
-
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
- assert_fields_eq(&plan, vec!["a", "b"]);
-
- let expected = "TableScan: test projection=[a, b]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn table_scan_projected_schema_non_qualified_relation() -> Result<()> {
- let table_scan = test_table_scan()?;
- let input_schema = table_scan.schema();
- assert_eq!(3, input_schema.fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- // Build the LogicalPlan directly (don't use PlanBuilder), so
- // that the Column references are unqualified (e.g. their
- // relation is `None`). PlanBuilder resolves the expressions
- let expr = vec![col("test.a"), col("test.b")];
- let plan =
- LogicalPlan::Projection(Projection::try_new(expr,
Arc::new(table_scan))?);
-
- assert_fields_eq(&plan, vec!["a", "b"]);
-
- let expected = "TableScan: test projection=[a, b]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn table_limit() -> Result<()> {
- let table_scan = test_table_scan()?;
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("c"), col("a")])?
- .limit(0, Some(5))?
- .build()?;
-
- assert_fields_eq(&plan, vec!["c", "a"]);
-
- let expected = "Limit: skip=0, fetch=5\
- \n Projection: test.c, test.a\
- \n TableScan: test projection=[a, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn table_scan_without_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan).build()?;
- // should expand projection to all columns without projection
- let expected = "TableScan: test projection=[a, b, c]";
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn table_scan_with_literal_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![lit(1_i64), lit(2_i64)])?
- .build()?;
- let expected = "Projection: Int64(1), Int64(2)\
- \n TableScan: test projection=[]";
- assert_optimized_plan_eq(plan, expected)
- }
-
- /// tests that it removes unused columns in projections
- #[test]
- fn table_unused_column() -> Result<()> {
- let table_scan = test_table_scan()?;
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- // we never use "b" in the first projection => remove it
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("c"), col("a"), col("b")])?
- .filter(col("c").gt(lit(1)))?
- .aggregate(vec![col("c")], vec![max(col("a"))])?
- .build()?;
-
- assert_fields_eq(&plan, vec!["c", "MAX(test.a)"]);
-
- let plan = optimize(plan).expect("failed to optimize plan");
- let expected = "\
- Aggregate: groupBy=[[test.c]], aggr=[[MAX(test.a)]]\
- \n Filter: test.c > Int32(1)\
- \n Projection: test.c, test.a\
- \n TableScan: test projection=[a, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- /// tests that it removes un-needed projections
- #[test]
- fn table_unused_projection() -> Result<()> {
- let table_scan = test_table_scan()?;
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- // there is no need for the first projection
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("b")])?
- .project(vec![lit(1).alias("a")])?
- .build()?;
-
- assert_fields_eq(&plan, vec!["a"]);
-
- let expected = "\
- Projection: Int32(1) AS a\
- \n TableScan: test projection=[]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn table_full_filter_pushdown() -> Result<()> {
- let schema = Schema::new(test_table_scan_fields());
-
- let table_scan = table_scan_with_filters(
- Some("test"),
- &schema,
- None,
- vec![col("b").eq(lit(1))],
- )?
- .build()?;
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- // there is no need for the first projection
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("b")])?
- .project(vec![lit(1).alias("a")])?
- .build()?;
-
- assert_fields_eq(&plan, vec!["a"]);
-
- let expected = "\
- Projection: Int32(1) AS a\
- \n TableScan: test projection=[], full_filters=[b = Int32(1)]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- /// tests that optimizing twice yields same plan
- #[test]
- fn test_double_optimization() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("b")])?
- .project(vec![lit(1).alias("a")])?
- .build()?;
-
- let optimized_plan1 = optimize(plan).expect("failed to optimize plan");
- let optimized_plan2 =
- optimize(optimized_plan1.clone()).expect("failed to optimize
plan");
-
- let formatted_plan1 = format!("{optimized_plan1:?}");
- let formatted_plan2 = format!("{optimized_plan2:?}");
- assert_eq!(formatted_plan1, formatted_plan2);
- Ok(())
- }
-
- /// tests that it removes an aggregate is never used downstream
- #[test]
- fn table_unused_aggregate() -> Result<()> {
- let table_scan = test_table_scan()?;
- assert_eq!(3, table_scan.schema().fields().len());
- assert_fields_eq(&table_scan, vec!["a", "b", "c"]);
-
- // we never use "min(b)" => remove it
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(vec![col("a"), col("c")], vec![max(col("b")),
min(col("b"))])?
- .filter(col("c").gt(lit(1)))?
- .project(vec![col("c"), col("a"), col("MAX(test.b)")])?
- .build()?;
-
- assert_fields_eq(&plan, vec!["c", "a", "MAX(test.b)"]);
-
- let expected = "Projection: test.c, test.a, MAX(test.b)\
- \n Filter: test.c > Int32(1)\
- \n Aggregate: groupBy=[[test.a, test.c]], aggr=[[MAX(test.b)]]\
- \n TableScan: test projection=[a, b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn aggregate_filter_pushdown() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let aggr_with_filter =
Expr::AggregateFunction(expr::AggregateFunction::new(
- AggregateFunction::Count,
- vec![col("b")],
- false,
- Some(Box::new(col("c").gt(lit(42)))),
- None,
- None,
- ));
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .aggregate(
- vec![col("a")],
- vec![count(col("b")), aggr_with_filter.alias("count2")],
- )?
- .build()?;
-
- let expected = "Aggregate: groupBy=[[test.a]], aggr=[[COUNT(test.b),
COUNT(test.b) FILTER (WHERE test.c > Int32(42)) AS count2]]\
- \n TableScan: test projection=[a, b, c]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn pushdown_through_distinct() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .project(vec![col("a"), col("b")])?
- .distinct()?
- .project(vec![col("a")])?
- .build()?;
-
- let expected = "Projection: test.a\
- \n Distinct:\
- \n TableScan: test projection=[a, b]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- #[test]
- fn test_window() -> Result<()> {
- let table_scan = test_table_scan()?;
-
- let max1 = Expr::WindowFunction(expr::WindowFunction::new(
-
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
- vec![col("test.a")],
- vec![col("test.b")],
- vec![],
- WindowFrame::new(None),
- None,
- ));
-
- let max2 = Expr::WindowFunction(expr::WindowFunction::new(
-
WindowFunctionDefinition::AggregateFunction(AggregateFunction::Max),
- vec![col("test.b")],
- vec![],
- vec![],
- WindowFrame::new(None),
- None,
- ));
- let col1 = col(max1.display_name()?);
- let col2 = col(max2.display_name()?);
-
- let plan = LogicalPlanBuilder::from(table_scan)
- .window(vec![max1])?
- .window(vec![max2])?
- .project(vec![col1, col2])?
- .build()?;
-
- let expected = "Projection: MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING, MAX(test.b) ROWS BETWEEN
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
- \n WindowAggr: windowExpr=[[MAX(test.b) ROWS BETWEEN UNBOUNDED
PRECEDING AND UNBOUNDED FOLLOWING]]\
- \n Projection: test.b, MAX(test.a) PARTITION BY [test.b] ROWS
BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING\
- \n WindowAggr: windowExpr=[[MAX(test.a) PARTITION BY [test.b]
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]\
- \n TableScan: test projection=[a, b]";
-
- assert_optimized_plan_eq(plan, expected)
- }
-
- fn assert_optimized_plan_eq(plan: LogicalPlan, expected: &str) ->
Result<()> {
- let optimized_plan = optimize(plan).expect("failed to optimize plan");
- let formatted_plan = format!("{optimized_plan:?}");
- assert_eq!(formatted_plan, expected);
- Ok(())
- }
-
- fn optimize(plan: LogicalPlan) -> Result<LogicalPlan> {
- let optimizer =
Optimizer::with_rules(vec![Arc::new(OptimizeProjections::new())]);
- let optimized_plan =
- optimizer.optimize(plan, &OptimizerContext::new(), observe)?;
-
- Ok(optimized_plan)
- }
-
- fn observe(_plan: &LogicalPlan, _rule: &dyn OptimizerRule) {}
-}