This is an automated email from the ASF dual-hosted git repository.
liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 1bcb333ee reimplement `limit_push_down` to remove global-state,
enhance optimize and simplify code. (#4276)
1bcb333ee is described below
commit 1bcb333ee4453c2e9e300618f9bb50bc992856c7
Author: jakevin <[email protected]>
AuthorDate: Tue Nov 22 16:30:09 2022 +0800
reimplement `limit_push_down` to remove global-state, enhance optimize and
simplify code. (#4276)
* reimplement `limit_push_down`.
* add comment
* polish ut
* fix bug, add UT
---
datafusion/optimizer/src/limit_push_down.rs | 699 +++++++++++-----------------
1 file changed, 263 insertions(+), 436 deletions(-)
diff --git a/datafusion/optimizer/src/limit_push_down.rs
b/datafusion/optimizer/src/limit_push_down.rs
index 3cda94b68..66dba0d85 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -17,19 +17,18 @@
//! Optimizer rule to push down LIMIT in the query plan
//! It will push down through projection, limits (taking the smaller limit)
-use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{DataFusionError, Result};
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::utils::from_plan;
use datafusion_expr::{
logical_plan::{
Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
},
- utils::from_plan,
CrossJoin,
};
use std::sync::Arc;
-/// Optimization rule that tries pushes down LIMIT n
-/// where applicable to reduce the amount of scanned / processed data.
+/// Optimization rule that tries to push down LIMIT.
#[derive(Default)]
pub struct LimitPushDown {}
@@ -40,348 +39,181 @@ impl LimitPushDown {
}
}
-/// Ancestor indicates the current ancestor in the LogicalPlan tree
-/// when traversing down related to "limit push down".
-enum Ancestor {
- /// Limit
- FromLimit { skip: usize, fetch: Option<usize> },
- /// Other nodes that don't affect the adjustment of "Limit"
- NotRelevant,
+fn push_down_join(
+ join: &Join,
+ left_limit: Option<usize>,
+ right_limit: Option<usize>,
+) -> LogicalPlan {
+ let left = match left_limit {
+ Some(limit) => LogicalPlan::Limit(Limit {
+ skip: 0,
+ fetch: Some(limit),
+ input: Arc::new((*join.left).clone()),
+ }),
+ None => (*join.left).clone(),
+ };
+ let right = match right_limit {
+ Some(limit) => LogicalPlan::Limit(Limit {
+ skip: 0,
+ fetch: Some(limit),
+ input: Arc::new((*join.right).clone()),
+ }),
+ None => (*join.right).clone(),
+ };
+ LogicalPlan::Join(Join {
+ left: Arc::new(left),
+ right: Arc::new(right),
+ on: join.on.clone(),
+ filter: join.filter.clone(),
+ join_type: join.join_type,
+ join_constraint: join.join_constraint,
+ schema: join.schema.clone(),
+ null_equals_null: join.null_equals_null,
+ })
}
-///
-/// When doing limit push down with "skip" and "fetch" during traversal,
-/// the "fetch" should be adjusted.
-/// "Ancestor" is pushed down the plan tree, so that the current node
-/// can adjust it's own "fetch".
-///
-/// If the current node is a Limit, its "fetch" is updated by:
-/// 1. extended_fetch = extended the "fetch" with ancestor's "skip".
-/// 2. min(extended_fetch, current node's fetch)
-///
-/// Current node's "skip" is never updated, it is
-/// just a hint for the child to extend its "fetch".
-///
-/// When building a new Limit in Union, the "fetch" is calculated
-/// by using ancestor's "fetch" and "skip".
-///
-/// When finally assign "limit" in TableScan, the "limit" is calculated
-/// by using ancestor's "fetch" and "skip".
-///
-fn limit_push_down(
- _optimizer: &LimitPushDown,
- ancestor: Ancestor,
- plan: &LogicalPlan,
- _optimizer_config: &OptimizerConfig,
-) -> Result<LogicalPlan> {
- match (plan, ancestor) {
- (
- LogicalPlan::Limit(Limit {
- skip: current_skip,
- fetch: current_fetch,
- input,
- }),
- ancestor,
- ) => {
- let new_current_fetch = match ancestor {
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: ancestor_fetch,
- } => {
- if let Some(fetch) = current_fetch {
- // extend ancestor's fetch
- let ancestor_fetch = ancestor_fetch.map(|f| f +
ancestor_skip);
-
- let new_current_fetch =
- ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x,
*fetch));
-
- Some(new_current_fetch)
- } else {
- // we dont have a "fetch", and we can push down our
parent's "fetch"
- // extend ancestor's fetch
- ancestor_fetch.map(|f| f + ancestor_skip)
- }
- }
- _ => *current_fetch,
+/// Push down Limit.
+impl OptimizerRule for LimitPushDown {
+ fn optimize(
+ &self,
+ plan: &LogicalPlan,
+ optimizer_config: &mut OptimizerConfig,
+ ) -> Result<LogicalPlan> {
+ let limit = match plan {
+ LogicalPlan::Limit(limit) => limit,
+ _ => return utils::optimize_children(self, plan, optimizer_config),
+ };
+
+ if let LogicalPlan::Limit(child_limit) = &*limit.input {
+ let parent_skip = limit.skip;
+ let parent_fetch = limit.fetch;
+
+ // Merge limit
+ // Parent range [child_skip + skip, child_skip + skip + fetch)
+ // Child range [child_skip, child_skip + child_fetch)
+ // Merge -> [child_skip + skip, min(child_skip + skip + fetch,
child_skip + child_fetch) )
+ // Merge LimitPlan -> [child_skip + skip, min(fetch, child_fetch -
skip) )
+ let new_fetch = match parent_fetch {
+ Some(fetch) => match child_limit.fetch {
+ Some(child_fetch) => Some(std::cmp::min(
+ fetch,
+ fetch_minus_skip(child_fetch, parent_skip),
+ )),
+ None => Some(fetch),
+ },
+ _ => child_limit
+ .fetch
+ .map(|child_fetch| fetch_minus_skip(child_fetch,
parent_skip)),
};
- Ok(LogicalPlan::Limit(Limit {
- // current node's "skip" is not updated, updating
- // this value would violate the semantics of Limit operator
- skip: *current_skip,
- fetch: new_current_fetch,
- input: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
- // current node's "skip" is passing to the subtree
- // so that the child can extend the "fetch"
- skip: *current_skip,
- fetch: new_current_fetch,
- },
- input.as_ref(),
- _optimizer_config,
- )?),
- }))
- }
- (
- LogicalPlan::TableScan(TableScan {
- table_name,
- source,
- projection,
- filters,
- fetch,
- projected_schema,
- }),
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: Some(ancestor_fetch),
- ..
- },
- ) => {
- let ancestor_fetch = ancestor_fetch + ancestor_skip;
- Ok(LogicalPlan::TableScan(TableScan {
- table_name: table_name.clone(),
- source: source.clone(),
- projection: projection.clone(),
- filters: filters.clone(),
- fetch: fetch
- .map(|x| std::cmp::min(x, ancestor_fetch))
- .or(Some(ancestor_fetch)),
- projected_schema: projected_schema.clone(),
- }))
- }
- (
- LogicalPlan::Projection(Projection {
- expr,
- input,
- schema,
- alias,
- }),
- ancestor,
- ) => {
- // Push down limit directly (projection doesn't change number of
rows)
- Ok(LogicalPlan::Projection(
- Projection::try_new_with_schema_alias(
- expr.clone(),
- Arc::new(limit_push_down(
- _optimizer,
- ancestor,
- input.as_ref(),
- _optimizer_config,
- )?),
- schema.clone(),
- alias.clone(),
- )?,
- ))
- }
- (
- LogicalPlan::Union(Union { inputs, schema }),
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: Some(ancestor_fetch),
- ..
- },
- ) => {
- // Push down limit through UNION
- let ancestor_fetch = ancestor_fetch + ancestor_skip;
- let new_inputs = inputs
- .iter()
- .map(|x| {
- Ok(Arc::new(LogicalPlan::Limit(Limit {
- skip: 0,
- fetch: Some(ancestor_fetch),
- input: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
- skip: 0,
- fetch: Some(ancestor_fetch),
- },
- x,
- _optimizer_config,
- )?),
- })))
- })
- .collect::<Result<_>>()?;
- Ok(LogicalPlan::Union(Union {
- inputs: new_inputs,
- schema: schema.clone(),
- }))
- }
- (
- LogicalPlan::CrossJoin(cross_join),
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: Some(ancestor_fetch),
- ..
- },
- ) => {
- let left = &*cross_join.left;
- let right = &*cross_join.right;
- Ok(LogicalPlan::CrossJoin(CrossJoin {
- left: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
- skip: 0,
- fetch: Some(ancestor_fetch + ancestor_skip),
- },
- left,
- _optimizer_config,
- )?),
- right: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
- skip: 0,
- fetch: Some(ancestor_fetch + ancestor_skip),
- },
- right,
- _optimizer_config,
- )?),
- schema: plan.schema().clone(),
- }))
- }
- (
- LogicalPlan::Join(Join { join_type, .. }),
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: Some(ancestor_fetch),
- ..
- },
- ) => {
- let ancestor_fetch = ancestor_fetch + ancestor_skip;
- match join_type {
- JoinType::Left => {
- //if LeftOuter join push limit to left
- generate_push_down_join(
- _optimizer,
- _optimizer_config,
- plan,
- Some(ancestor_fetch),
- None,
- )
- }
- JoinType::Right =>
- // If RightOuter join push limit to right
- {
- generate_push_down_join(
- _optimizer,
- _optimizer_config,
- plan,
- None,
- Some(ancestor_fetch),
- )
- }
- _ => generate_push_down_join(
- _optimizer,
- _optimizer_config,
- plan,
- None,
- None,
- ),
- }
- }
- (
- LogicalPlan::Sort(Sort { expr, input, fetch }),
- Ancestor::FromLimit {
- skip: ancestor_skip,
- fetch: Some(ancestor_fetch),
- ..
- },
- ) => {
- // Update Sort `fetch`, but simply recurse through children (sort
should receive all input for sorting)
- let input = push_down_children_limit(_optimizer,
_optimizer_config, input)?;
- let sort_fetch = ancestor_skip + ancestor_fetch;
- let plan = LogicalPlan::Sort(Sort {
- expr: expr.clone(),
- input: Arc::new(input),
- fetch: Some(fetch.map(|f|
f.min(sort_fetch)).unwrap_or(sort_fetch)),
+ let plan = LogicalPlan::Limit(Limit {
+ skip: child_limit.skip + limit.skip,
+ fetch: new_fetch,
+ input: Arc::new((*child_limit.input).clone()),
});
- Ok(plan)
+ return self.optimize(&plan, optimizer_config);
}
- // For other nodes we can't push down the limit
- // But try to recurse and find other limit nodes to push down
- _ => push_down_children_limit(_optimizer, _optimizer_config, plan),
- }
-}
+ let fetch = match limit.fetch {
+ Some(fetch) => fetch,
+ None => return utils::optimize_children(self, plan,
optimizer_config),
+ };
+ let skip = limit.skip;
+
+ let plan = match &*limit.input {
+ LogicalPlan::TableScan(scan) => {
+ let limit = fetch + skip;
+ let new_input = LogicalPlan::TableScan(TableScan {
+ table_name: scan.table_name.clone(),
+ source: scan.source.clone(),
+ projection: scan.projection.clone(),
+ filters: scan.filters.clone(),
+ fetch: scan.fetch.map(|x| std::cmp::min(x,
limit)).or(Some(limit)),
+ projected_schema: scan.projected_schema.clone(),
+ });
+ from_plan(plan, &plan.expressions(), &[new_input])?
+ }
-fn generate_push_down_join(
- _optimizer: &LimitPushDown,
- _optimizer_config: &OptimizerConfig,
- join: &LogicalPlan,
- left_limit: Option<usize>,
- right_limit: Option<usize>,
-) -> Result<LogicalPlan> {
- if let LogicalPlan::Join(Join {
- left,
- right,
- on,
- filter,
- join_type,
- join_constraint,
- schema,
- null_equals_null,
- }) = join
- {
- Ok(LogicalPlan::Join(Join {
- left: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
+ LogicalPlan::Projection(projection) => {
+ let new_input = LogicalPlan::Limit(Limit {
+ skip,
+ fetch: Some(fetch),
+ input: Arc::new((*projection.input).clone()),
+ });
+ // Push down limit directly (projection doesn't change number
of rows)
+ LogicalPlan::Projection(Projection::try_new_with_schema_alias(
+ projection.expr.clone(),
+ Arc::new(new_input),
+ projection.schema.clone(),
+ projection.alias.clone(),
+ )?)
+ }
+
+ LogicalPlan::Union(union) => {
+ let new_inputs = union
+ .inputs
+ .iter()
+ .map(|x| {
+ Ok(Arc::new(LogicalPlan::Limit(Limit {
+ skip: 0,
+ fetch: Some(fetch + skip),
+ input: Arc::new((**x).clone()),
+ })))
+ })
+ .collect::<Result<_>>()?;
+ let union = LogicalPlan::Union(Union {
+ inputs: new_inputs,
+ schema: union.schema.clone(),
+ });
+ from_plan(plan, &plan.expressions(), &[union])?
+ }
+
+ LogicalPlan::CrossJoin(cross_join) => {
+ let left = &*cross_join.left;
+ let right = &*cross_join.right;
+ let new_left = LogicalPlan::Limit(Limit {
skip: 0,
- fetch: left_limit,
- },
- left.as_ref(),
- _optimizer_config,
- )?),
- right: Arc::new(limit_push_down(
- _optimizer,
- Ancestor::FromLimit {
+ fetch: Some(fetch + skip),
+ input: Arc::new(left.clone()),
+ });
+ let new_right = LogicalPlan::Limit(Limit {
skip: 0,
- fetch: right_limit,
- },
- right.as_ref(),
- _optimizer_config,
- )?),
- on: on.clone(),
- filter: filter.clone(),
- join_type: *join_type,
- join_constraint: *join_constraint,
- schema: schema.clone(),
- null_equals_null: *null_equals_null,
- }))
- } else {
- Err(DataFusionError::Internal(format!(
- "{:?} must be join type",
- join
- )))
- }
-}
+ fetch: Some(fetch + skip),
+ input: Arc::new(right.clone()),
+ });
+ let new_input = LogicalPlan::CrossJoin(CrossJoin {
+ left: Arc::new(new_left),
+ right: Arc::new(new_right),
+ schema: plan.schema().clone(),
+ });
+ from_plan(plan, &plan.expressions(), &[new_input])?
+ }
-fn push_down_children_limit(
- _optimizer: &LimitPushDown,
- _optimizer_config: &OptimizerConfig,
- plan: &LogicalPlan,
-) -> Result<LogicalPlan> {
- let expr = plan.expressions();
-
- // apply the optimization to all inputs of the plan
- let inputs = plan.inputs();
- let new_inputs = inputs
- .iter()
- .map(|plan| {
- limit_push_down(_optimizer, Ancestor::NotRelevant, plan,
_optimizer_config)
- })
- .collect::<Result<Vec<_>>>()?;
-
- from_plan(plan, &expr, &new_inputs)
-}
+ LogicalPlan::Join(join) => {
+ let limit = fetch + skip;
+ let new_join = match join.join_type {
+ JoinType::Left => push_down_join(join, Some(limit), None),
+ JoinType::Right => push_down_join(join, None, Some(limit)),
+ _ => push_down_join(join, None, None),
+ };
+ from_plan(plan, &plan.expressions(), &[new_join])?
+ }
-impl OptimizerRule for LimitPushDown {
- fn optimize(
- &self,
- plan: &LogicalPlan,
- optimizer_config: &mut OptimizerConfig,
- ) -> Result<LogicalPlan> {
- limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
+ LogicalPlan::Sort(sort) => {
+ let sort_fetch = skip + fetch;
+ let new_input = LogicalPlan::Sort(Sort {
+ expr: sort.expr.clone(),
+ input: Arc::new((*sort.input).clone()),
+ fetch: Some(
+ sort.fetch.map(|f|
f.min(sort_fetch)).unwrap_or(sort_fetch),
+ ),
+ });
+ from_plan(plan, &plan.expressions(), &[new_input])?
+ }
+ _ => plan.clone(),
+ };
+
+ utils::optimize_children(self, &plan, optimizer_config)
}
fn name(&self) -> &str {
@@ -389,6 +221,14 @@ impl OptimizerRule for LimitPushDown {
}
}
+fn fetch_minus_skip(fetch: usize, skip: usize) -> usize {
+ if skip > fetch {
+ 0
+ } else {
+ fetch - skip
+ }
+}
+
#[cfg(test)]
mod test {
use std::vec;
@@ -401,13 +241,17 @@ mod test {
max,
};
- fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
- let rule = LimitPushDown::new();
- let optimized_plan = rule
+ fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) ->
Result<()> {
+ let optimized_plan = LimitPushDown::new()
.optimize(plan, &mut OptimizerConfig::new())
.expect("failed to optimize plan");
+
let formatted_plan = format!("{:?}", optimized_plan);
+
assert_eq!(formatted_plan, expected);
+ assert_eq!(optimized_plan.schema(), plan.schema());
+
+ Ok(())
}
#[test]
@@ -421,13 +265,11 @@ mod test {
// Should push the limit down to table provider
// When it has a select
- let expected = "Limit: skip=0, fetch=1000\
- \n Projection: test.a\
+ let expected = "Projection: test.a\
+ \n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -443,12 +285,9 @@ mod test {
// Towards table scan
// This rule doesn't replace multiple limits
let expected = "Limit: skip=0, fetch=10\
- \n Limit: skip=0, fetch=10\
- \n TableScan: test, fetch=10";
-
- assert_optimized_plan_eq(&plan, expected);
+ \n TableScan: test, fetch=10";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -465,9 +304,7 @@ mod test {
\n Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -487,9 +324,7 @@ mod test {
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -506,9 +341,7 @@ mod test {
\n Sort: test.a, fetch=10\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -525,9 +358,7 @@ mod test {
\n Sort: test.a, fetch=15\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -546,9 +377,7 @@ mod test {
\n Limit: skip=0, fetch=1000\
\n TableScan: test, fetch=1000";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -563,8 +392,7 @@ mod test {
let expected = "Limit: skip=10, fetch=None\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -578,13 +406,11 @@ mod test {
// Should push the limit down to table provider
// When it has a select
- let expected = "Limit: skip=10, fetch=1000\
- \n Projection: test.a\
+ let expected = "Projection: test.a\
+ \n Limit: skip=10, fetch=1000\
\n TableScan: test, fetch=1010";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -597,14 +423,11 @@ mod test {
.limit(10, None)?
.build()?;
- let expected = "Limit: skip=10, fetch=None\
- \n Limit: skip=0, fetch=1000\
- \n Projection: test.a\
- \n TableScan: test, fetch=1000";
-
- assert_optimized_plan_eq(&plan, expected);
+ let expected = "Projection: test.a\
+ \n Limit: skip=10, fetch=990\
+ \n TableScan: test, fetch=1000";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -617,14 +440,11 @@ mod test {
.limit(0, Some(1000))?
.build()?;
- let expected = "Limit: skip=0, fetch=1000\
+ let expected = "Projection: test.a\
\n Limit: skip=10, fetch=1000\
- \n Projection: test.a\
- \n TableScan: test, fetch=1010";
-
- assert_optimized_plan_eq(&plan, expected);
+ \n TableScan: test, fetch=1010";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -637,17 +457,10 @@ mod test {
.limit(0, Some(10))?
.build()?;
- // Should push down the smallest limit
- // Towards table scan
- // This rule doesn't replace multiple limits
- let expected = "Limit: skip=0, fetch=10\
- \n Limit: skip=0, fetch=10\
- \n Limit: skip=10, fetch=10\
- \n TableScan: test, fetch=20";
-
- assert_optimized_plan_eq(&plan, expected);
+ let expected = "Limit: skip=10, fetch=10\
+ \n TableScan: test, fetch=20";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -664,9 +477,7 @@ mod test {
\n Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\
\n TableScan: test";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -686,9 +497,7 @@ mod test {
\n Limit: skip=0, fetch=1010\
\n TableScan: test, fetch=1010";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -712,9 +521,7 @@ mod test {
\n TableScan: test\
\n TableScan: test2";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -738,9 +545,7 @@ mod test {
\n TableScan: test\
\n TableScan: test2";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -769,9 +574,7 @@ mod test {
\n Projection: test2.a\
\n TableScan: test2";
- assert_optimized_plan_eq(&outer_query, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&outer_query, expected)
}
#[test]
@@ -800,9 +603,7 @@ mod test {
\n Projection: test2.a\
\n TableScan: test2";
- assert_optimized_plan_eq(&outer_query, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&outer_query, expected)
}
#[test]
@@ -823,12 +624,11 @@ mod test {
// Limit pushdown Not supported in Join
let expected = "Limit: skip=0, fetch=1000\
\n Left Join: test.a = test2.a\
- \n TableScan: test, fetch=1000\
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test, fetch=1000\
\n TableScan: test2";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -849,12 +649,11 @@ mod test {
// Limit pushdown Not supported in Join
let expected = "Limit: skip=10, fetch=1000\
\n Left Join: test.a = test2.a\
- \n TableScan: test, fetch=1010\
+ \n Limit: skip=0, fetch=1010\
+ \n TableScan: test, fetch=1010\
\n TableScan: test2";
- assert_optimized_plan_eq(&plan, expected);
-
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -876,11 +675,10 @@ mod test {
let expected = "Limit: skip=0, fetch=1000\
\n Right Join: test.a = test2.a\
\n TableScan: test\
- \n TableScan: test2, fetch=1000";
-
- assert_optimized_plan_eq(&plan, expected);
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test2, fetch=1000";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -902,11 +700,10 @@ mod test {
let expected = "Limit: skip=10, fetch=1000\
\n Right Join: test.a = test2.a\
\n TableScan: test\
- \n TableScan: test2, fetch=1010";
-
- assert_optimized_plan_eq(&plan, expected);
+ \n Limit: skip=0, fetch=1010\
+ \n TableScan: test2, fetch=1010";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -921,12 +718,12 @@ mod test {
let expected = "Limit: skip=0, fetch=1000\
\n CrossJoin:\
- \n TableScan: test, fetch=1000\
- \n TableScan: test2, fetch=1000";
-
- assert_optimized_plan_eq(&plan, expected);
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test, fetch=1000\
+ \n Limit: skip=0, fetch=1000\
+ \n TableScan: test2, fetch=1000";
- Ok(())
+ assert_optimized_plan_eq(&plan, expected)
}
#[test]
@@ -941,11 +738,41 @@ mod test {
let expected = "Limit: skip=1000, fetch=1000\
\n CrossJoin:\
- \n TableScan: test, fetch=2000\
- \n TableScan: test2, fetch=2000";
+ \n Limit: skip=0, fetch=2000\
+ \n TableScan: test, fetch=2000\
+ \n Limit: skip=0, fetch=2000\
+ \n TableScan: test2, fetch=2000";
- assert_optimized_plan_eq(&plan, expected);
+ assert_optimized_plan_eq(&plan, expected)
+ }
- Ok(())
+ #[test]
+ fn merge_limit_result_empty() -> Result<()> {
+ let scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(scan)
+ .limit(0, Some(1000))?
+ .limit(1000, None)?
+ .build()?;
+
+ let expected = "Limit: skip=1000, fetch=0\
+ \n TableScan: test, fetch=1000";
+
+ assert_optimized_plan_eq(&plan, expected)
+ }
+
+ #[test]
+ fn skip_great_than_fetch() -> Result<()> {
+ let scan = test_table_scan()?;
+
+ let plan = LogicalPlanBuilder::from(scan)
+ .limit(0, Some(1))?
+ .limit(1000, None)?
+ .build()?;
+
+ let expected = "Limit: skip=1000, fetch=0\
+ \n TableScan: test, fetch=1000";
+
+ assert_optimized_plan_eq(&plan, expected)
}
}