This is an automated email from the ASF dual-hosted git repository.

liukun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bcb333ee reimplement `limit_push_down` to remove global-state, 
enhance optimize and simplify code. (#4276)
1bcb333ee is described below

commit 1bcb333ee4453c2e9e300618f9bb50bc992856c7
Author: jakevin <[email protected]>
AuthorDate: Tue Nov 22 16:30:09 2022 +0800

    reimplement `limit_push_down` to remove global-state, enhance optimize and 
simplify code. (#4276)
    
    * reimplement `limit_push_down`.
    
    * add comment
    
    * polish ut
    
    * fix bug, add UT
---
 datafusion/optimizer/src/limit_push_down.rs | 699 +++++++++++-----------------
 1 file changed, 263 insertions(+), 436 deletions(-)

diff --git a/datafusion/optimizer/src/limit_push_down.rs 
b/datafusion/optimizer/src/limit_push_down.rs
index 3cda94b68..66dba0d85 100644
--- a/datafusion/optimizer/src/limit_push_down.rs
+++ b/datafusion/optimizer/src/limit_push_down.rs
@@ -17,19 +17,18 @@
 
 //! Optimizer rule to push down LIMIT in the query plan
 //! It will push down through projection, limits (taking the smaller limit)
-use crate::{OptimizerConfig, OptimizerRule};
-use datafusion_common::{DataFusionError, Result};
+use crate::{utils, OptimizerConfig, OptimizerRule};
+use datafusion_common::Result;
+use datafusion_expr::utils::from_plan;
 use datafusion_expr::{
     logical_plan::{
         Join, JoinType, Limit, LogicalPlan, Projection, Sort, TableScan, Union,
     },
-    utils::from_plan,
     CrossJoin,
 };
 use std::sync::Arc;
 
-/// Optimization rule that tries pushes down LIMIT n
-/// where applicable to reduce the amount of scanned / processed data.
+/// Optimization rule that tries to push down LIMIT.
 #[derive(Default)]
 pub struct LimitPushDown {}
 
@@ -40,348 +39,181 @@ impl LimitPushDown {
     }
 }
 
-/// Ancestor indicates the current ancestor in the LogicalPlan tree
-/// when traversing down related to "limit push down".
-enum Ancestor {
-    /// Limit
-    FromLimit { skip: usize, fetch: Option<usize> },
-    /// Other nodes that don't affect the adjustment of "Limit"
-    NotRelevant,
+fn push_down_join(
+    join: &Join,
+    left_limit: Option<usize>,
+    right_limit: Option<usize>,
+) -> LogicalPlan {
+    let left = match left_limit {
+        Some(limit) => LogicalPlan::Limit(Limit {
+            skip: 0,
+            fetch: Some(limit),
+            input: Arc::new((*join.left).clone()),
+        }),
+        None => (*join.left).clone(),
+    };
+    let right = match right_limit {
+        Some(limit) => LogicalPlan::Limit(Limit {
+            skip: 0,
+            fetch: Some(limit),
+            input: Arc::new((*join.right).clone()),
+        }),
+        None => (*join.right).clone(),
+    };
+    LogicalPlan::Join(Join {
+        left: Arc::new(left),
+        right: Arc::new(right),
+        on: join.on.clone(),
+        filter: join.filter.clone(),
+        join_type: join.join_type,
+        join_constraint: join.join_constraint,
+        schema: join.schema.clone(),
+        null_equals_null: join.null_equals_null,
+    })
 }
 
-///
-/// When doing limit push down with "skip" and "fetch" during traversal,
-/// the "fetch" should be adjusted.
-/// "Ancestor" is pushed down the plan tree, so that the current node
-/// can adjust it's own "fetch".
-///
-/// If the current node is a Limit, its "fetch" is updated by:
-/// 1. extended_fetch = extended the "fetch" with ancestor's "skip".
-/// 2. min(extended_fetch, current node's fetch)
-///
-/// Current node's "skip" is never updated, it is
-/// just a hint for the child to extend its "fetch".
-///
-/// When building a new Limit in Union, the "fetch" is calculated
-/// by using ancestor's "fetch" and "skip".
-///
-/// When finally assign "limit" in TableScan, the "limit" is calculated
-/// by using ancestor's "fetch" and "skip".
-///
-fn limit_push_down(
-    _optimizer: &LimitPushDown,
-    ancestor: Ancestor,
-    plan: &LogicalPlan,
-    _optimizer_config: &OptimizerConfig,
-) -> Result<LogicalPlan> {
-    match (plan, ancestor) {
-        (
-            LogicalPlan::Limit(Limit {
-                skip: current_skip,
-                fetch: current_fetch,
-                input,
-            }),
-            ancestor,
-        ) => {
-            let new_current_fetch = match ancestor {
-                Ancestor::FromLimit {
-                    skip: ancestor_skip,
-                    fetch: ancestor_fetch,
-                } => {
-                    if let Some(fetch) = current_fetch {
-                        // extend ancestor's fetch
-                        let ancestor_fetch = ancestor_fetch.map(|f| f + 
ancestor_skip);
-
-                        let new_current_fetch =
-                            ancestor_fetch.map_or(*fetch, |x| std::cmp::min(x, 
*fetch));
-
-                        Some(new_current_fetch)
-                    } else {
-                        // we dont have a "fetch", and we can push down our 
parent's "fetch"
-                        // extend ancestor's fetch
-                        ancestor_fetch.map(|f| f + ancestor_skip)
-                    }
-                }
-                _ => *current_fetch,
+/// Push down Limit.
+impl OptimizerRule for LimitPushDown {
+    fn optimize(
+        &self,
+        plan: &LogicalPlan,
+        optimizer_config: &mut OptimizerConfig,
+    ) -> Result<LogicalPlan> {
+        let limit = match plan {
+            LogicalPlan::Limit(limit) => limit,
+            _ => return utils::optimize_children(self, plan, optimizer_config),
+        };
+
+        if let LogicalPlan::Limit(child_limit) = &*limit.input {
+            let parent_skip = limit.skip;
+            let parent_fetch = limit.fetch;
+
+            // Merge limit
+            // Parent range [child_skip + skip, child_skip + skip + fetch)
+            // Child range [child_skip, child_skip + child_fetch)
+            // Merge -> [child_skip + skip, min(child_skip + skip + fetch, 
child_skip + child_fetch) )
+            // Merge LimitPlan -> [child_skip + skip, min(fetch, child_fetch - 
skip) )
+            let new_fetch = match parent_fetch {
+                Some(fetch) => match child_limit.fetch {
+                    Some(child_fetch) => Some(std::cmp::min(
+                        fetch,
+                        fetch_minus_skip(child_fetch, parent_skip),
+                    )),
+                    None => Some(fetch),
+                },
+                _ => child_limit
+                    .fetch
+                    .map(|child_fetch| fetch_minus_skip(child_fetch, 
parent_skip)),
             };
 
-            Ok(LogicalPlan::Limit(Limit {
-                // current node's "skip" is not updated, updating
-                // this value would violate the semantics of Limit operator
-                skip: *current_skip,
-                fetch: new_current_fetch,
-                input: Arc::new(limit_push_down(
-                    _optimizer,
-                    Ancestor::FromLimit {
-                        // current node's "skip" is passing to the subtree
-                        // so that the child can extend the "fetch"
-                        skip: *current_skip,
-                        fetch: new_current_fetch,
-                    },
-                    input.as_ref(),
-                    _optimizer_config,
-                )?),
-            }))
-        }
-        (
-            LogicalPlan::TableScan(TableScan {
-                table_name,
-                source,
-                projection,
-                filters,
-                fetch,
-                projected_schema,
-            }),
-            Ancestor::FromLimit {
-                skip: ancestor_skip,
-                fetch: Some(ancestor_fetch),
-                ..
-            },
-        ) => {
-            let ancestor_fetch = ancestor_fetch + ancestor_skip;
-            Ok(LogicalPlan::TableScan(TableScan {
-                table_name: table_name.clone(),
-                source: source.clone(),
-                projection: projection.clone(),
-                filters: filters.clone(),
-                fetch: fetch
-                    .map(|x| std::cmp::min(x, ancestor_fetch))
-                    .or(Some(ancestor_fetch)),
-                projected_schema: projected_schema.clone(),
-            }))
-        }
-        (
-            LogicalPlan::Projection(Projection {
-                expr,
-                input,
-                schema,
-                alias,
-            }),
-            ancestor,
-        ) => {
-            // Push down limit directly (projection doesn't change number of 
rows)
-            Ok(LogicalPlan::Projection(
-                Projection::try_new_with_schema_alias(
-                    expr.clone(),
-                    Arc::new(limit_push_down(
-                        _optimizer,
-                        ancestor,
-                        input.as_ref(),
-                        _optimizer_config,
-                    )?),
-                    schema.clone(),
-                    alias.clone(),
-                )?,
-            ))
-        }
-        (
-            LogicalPlan::Union(Union { inputs, schema }),
-            Ancestor::FromLimit {
-                skip: ancestor_skip,
-                fetch: Some(ancestor_fetch),
-                ..
-            },
-        ) => {
-            // Push down limit through UNION
-            let ancestor_fetch = ancestor_fetch + ancestor_skip;
-            let new_inputs = inputs
-                .iter()
-                .map(|x| {
-                    Ok(Arc::new(LogicalPlan::Limit(Limit {
-                        skip: 0,
-                        fetch: Some(ancestor_fetch),
-                        input: Arc::new(limit_push_down(
-                            _optimizer,
-                            Ancestor::FromLimit {
-                                skip: 0,
-                                fetch: Some(ancestor_fetch),
-                            },
-                            x,
-                            _optimizer_config,
-                        )?),
-                    })))
-                })
-                .collect::<Result<_>>()?;
-            Ok(LogicalPlan::Union(Union {
-                inputs: new_inputs,
-                schema: schema.clone(),
-            }))
-        }
-        (
-            LogicalPlan::CrossJoin(cross_join),
-            Ancestor::FromLimit {
-                skip: ancestor_skip,
-                fetch: Some(ancestor_fetch),
-                ..
-            },
-        ) => {
-            let left = &*cross_join.left;
-            let right = &*cross_join.right;
-            Ok(LogicalPlan::CrossJoin(CrossJoin {
-                left: Arc::new(limit_push_down(
-                    _optimizer,
-                    Ancestor::FromLimit {
-                        skip: 0,
-                        fetch: Some(ancestor_fetch + ancestor_skip),
-                    },
-                    left,
-                    _optimizer_config,
-                )?),
-                right: Arc::new(limit_push_down(
-                    _optimizer,
-                    Ancestor::FromLimit {
-                        skip: 0,
-                        fetch: Some(ancestor_fetch + ancestor_skip),
-                    },
-                    right,
-                    _optimizer_config,
-                )?),
-                schema: plan.schema().clone(),
-            }))
-        }
-        (
-            LogicalPlan::Join(Join { join_type, .. }),
-            Ancestor::FromLimit {
-                skip: ancestor_skip,
-                fetch: Some(ancestor_fetch),
-                ..
-            },
-        ) => {
-            let ancestor_fetch = ancestor_fetch + ancestor_skip;
-            match join_type {
-                JoinType::Left => {
-                    //if LeftOuter join push limit to left
-                    generate_push_down_join(
-                        _optimizer,
-                        _optimizer_config,
-                        plan,
-                        Some(ancestor_fetch),
-                        None,
-                    )
-                }
-                JoinType::Right =>
-                // If RightOuter join  push limit to right
-                {
-                    generate_push_down_join(
-                        _optimizer,
-                        _optimizer_config,
-                        plan,
-                        None,
-                        Some(ancestor_fetch),
-                    )
-                }
-                _ => generate_push_down_join(
-                    _optimizer,
-                    _optimizer_config,
-                    plan,
-                    None,
-                    None,
-                ),
-            }
-        }
-        (
-            LogicalPlan::Sort(Sort { expr, input, fetch }),
-            Ancestor::FromLimit {
-                skip: ancestor_skip,
-                fetch: Some(ancestor_fetch),
-                ..
-            },
-        ) => {
-            // Update Sort `fetch`, but simply recurse through children (sort 
should receive all input for sorting)
-            let input = push_down_children_limit(_optimizer, 
_optimizer_config, input)?;
-            let sort_fetch = ancestor_skip + ancestor_fetch;
-            let plan = LogicalPlan::Sort(Sort {
-                expr: expr.clone(),
-                input: Arc::new(input),
-                fetch: Some(fetch.map(|f| 
f.min(sort_fetch)).unwrap_or(sort_fetch)),
+            let plan = LogicalPlan::Limit(Limit {
+                skip: child_limit.skip + limit.skip,
+                fetch: new_fetch,
+                input: Arc::new((*child_limit.input).clone()),
             });
-            Ok(plan)
+            return self.optimize(&plan, optimizer_config);
         }
 
-        // For other nodes we can't push down the limit
-        // But try to recurse and find other limit nodes to push down
-        _ => push_down_children_limit(_optimizer, _optimizer_config, plan),
-    }
-}
+        let fetch = match limit.fetch {
+            Some(fetch) => fetch,
+            None => return utils::optimize_children(self, plan, 
optimizer_config),
+        };
+        let skip = limit.skip;
+
+        let plan = match &*limit.input {
+            LogicalPlan::TableScan(scan) => {
+                let limit = fetch + skip;
+                let new_input = LogicalPlan::TableScan(TableScan {
+                    table_name: scan.table_name.clone(),
+                    source: scan.source.clone(),
+                    projection: scan.projection.clone(),
+                    filters: scan.filters.clone(),
+                    fetch: scan.fetch.map(|x| std::cmp::min(x, 
limit)).or(Some(limit)),
+                    projected_schema: scan.projected_schema.clone(),
+                });
+                from_plan(plan, &plan.expressions(), &[new_input])?
+            }
 
-fn generate_push_down_join(
-    _optimizer: &LimitPushDown,
-    _optimizer_config: &OptimizerConfig,
-    join: &LogicalPlan,
-    left_limit: Option<usize>,
-    right_limit: Option<usize>,
-) -> Result<LogicalPlan> {
-    if let LogicalPlan::Join(Join {
-        left,
-        right,
-        on,
-        filter,
-        join_type,
-        join_constraint,
-        schema,
-        null_equals_null,
-    }) = join
-    {
-        Ok(LogicalPlan::Join(Join {
-            left: Arc::new(limit_push_down(
-                _optimizer,
-                Ancestor::FromLimit {
+            LogicalPlan::Projection(projection) => {
+                let new_input = LogicalPlan::Limit(Limit {
+                    skip,
+                    fetch: Some(fetch),
+                    input: Arc::new((*projection.input).clone()),
+                });
+                // Push down limit directly (projection doesn't change number 
of rows)
+                LogicalPlan::Projection(Projection::try_new_with_schema_alias(
+                    projection.expr.clone(),
+                    Arc::new(new_input),
+                    projection.schema.clone(),
+                    projection.alias.clone(),
+                )?)
+            }
+
+            LogicalPlan::Union(union) => {
+                let new_inputs = union
+                    .inputs
+                    .iter()
+                    .map(|x| {
+                        Ok(Arc::new(LogicalPlan::Limit(Limit {
+                            skip: 0,
+                            fetch: Some(fetch + skip),
+                            input: Arc::new((**x).clone()),
+                        })))
+                    })
+                    .collect::<Result<_>>()?;
+                let union = LogicalPlan::Union(Union {
+                    inputs: new_inputs,
+                    schema: union.schema.clone(),
+                });
+                from_plan(plan, &plan.expressions(), &[union])?
+            }
+
+            LogicalPlan::CrossJoin(cross_join) => {
+                let left = &*cross_join.left;
+                let right = &*cross_join.right;
+                let new_left = LogicalPlan::Limit(Limit {
                     skip: 0,
-                    fetch: left_limit,
-                },
-                left.as_ref(),
-                _optimizer_config,
-            )?),
-            right: Arc::new(limit_push_down(
-                _optimizer,
-                Ancestor::FromLimit {
+                    fetch: Some(fetch + skip),
+                    input: Arc::new(left.clone()),
+                });
+                let new_right = LogicalPlan::Limit(Limit {
                     skip: 0,
-                    fetch: right_limit,
-                },
-                right.as_ref(),
-                _optimizer_config,
-            )?),
-            on: on.clone(),
-            filter: filter.clone(),
-            join_type: *join_type,
-            join_constraint: *join_constraint,
-            schema: schema.clone(),
-            null_equals_null: *null_equals_null,
-        }))
-    } else {
-        Err(DataFusionError::Internal(format!(
-            "{:?} must be join type",
-            join
-        )))
-    }
-}
+                    fetch: Some(fetch + skip),
+                    input: Arc::new(right.clone()),
+                });
+                let new_input = LogicalPlan::CrossJoin(CrossJoin {
+                    left: Arc::new(new_left),
+                    right: Arc::new(new_right),
+                    schema: plan.schema().clone(),
+                });
+                from_plan(plan, &plan.expressions(), &[new_input])?
+            }
 
-fn push_down_children_limit(
-    _optimizer: &LimitPushDown,
-    _optimizer_config: &OptimizerConfig,
-    plan: &LogicalPlan,
-) -> Result<LogicalPlan> {
-    let expr = plan.expressions();
-
-    // apply the optimization to all inputs of the plan
-    let inputs = plan.inputs();
-    let new_inputs = inputs
-        .iter()
-        .map(|plan| {
-            limit_push_down(_optimizer, Ancestor::NotRelevant, plan, 
_optimizer_config)
-        })
-        .collect::<Result<Vec<_>>>()?;
-
-    from_plan(plan, &expr, &new_inputs)
-}
+            LogicalPlan::Join(join) => {
+                let limit = fetch + skip;
+                let new_join = match join.join_type {
+                    JoinType::Left => push_down_join(join, Some(limit), None),
+                    JoinType::Right => push_down_join(join, None, Some(limit)),
+                    _ => push_down_join(join, None, None),
+                };
+                from_plan(plan, &plan.expressions(), &[new_join])?
+            }
 
-impl OptimizerRule for LimitPushDown {
-    fn optimize(
-        &self,
-        plan: &LogicalPlan,
-        optimizer_config: &mut OptimizerConfig,
-    ) -> Result<LogicalPlan> {
-        limit_push_down(self, Ancestor::NotRelevant, plan, optimizer_config)
+            LogicalPlan::Sort(sort) => {
+                let sort_fetch = skip + fetch;
+                let new_input = LogicalPlan::Sort(Sort {
+                    expr: sort.expr.clone(),
+                    input: Arc::new((*sort.input).clone()),
+                    fetch: Some(
+                        sort.fetch.map(|f| 
f.min(sort_fetch)).unwrap_or(sort_fetch),
+                    ),
+                });
+                from_plan(plan, &plan.expressions(), &[new_input])?
+            }
+            _ => plan.clone(),
+        };
+
+        utils::optimize_children(self, &plan, optimizer_config)
     }
 
     fn name(&self) -> &str {
@@ -389,6 +221,14 @@ impl OptimizerRule for LimitPushDown {
     }
 }
 
+fn fetch_minus_skip(fetch: usize, skip: usize) -> usize {
+    if skip > fetch {
+        0
+    } else {
+        fetch - skip
+    }
+}
+
 #[cfg(test)]
 mod test {
     use std::vec;
@@ -401,13 +241,17 @@ mod test {
         max,
     };
 
-    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) {
-        let rule = LimitPushDown::new();
-        let optimized_plan = rule
+    fn assert_optimized_plan_eq(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
+        let optimized_plan = LimitPushDown::new()
             .optimize(plan, &mut OptimizerConfig::new())
             .expect("failed to optimize plan");
+
         let formatted_plan = format!("{:?}", optimized_plan);
+
         assert_eq!(formatted_plan, expected);
+        assert_eq!(optimized_plan.schema(), plan.schema());
+
+        Ok(())
     }
 
     #[test]
@@ -421,13 +265,11 @@ mod test {
 
         // Should push the limit down to table provider
         // When it has a select
-        let expected = "Limit: skip=0, fetch=1000\
-        \n  Projection: test.a\
+        let expected = "Projection: test.a\
+        \n  Limit: skip=0, fetch=1000\
         \n    TableScan: test, fetch=1000";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -443,12 +285,9 @@ mod test {
         // Towards table scan
         // This rule doesn't replace multiple limits
         let expected = "Limit: skip=0, fetch=10\
-        \n  Limit: skip=0, fetch=10\
-        \n    TableScan: test, fetch=10";
-
-        assert_optimized_plan_eq(&plan, expected);
+        \n  TableScan: test, fetch=10";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -465,9 +304,7 @@ mod test {
         \n  Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -487,9 +324,7 @@ mod test {
         \n    Limit: skip=0, fetch=1000\
         \n      TableScan: test, fetch=1000";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -506,9 +341,7 @@ mod test {
         \n  Sort: test.a, fetch=10\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -525,9 +358,7 @@ mod test {
         \n  Sort: test.a, fetch=15\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -546,9 +377,7 @@ mod test {
         \n    Limit: skip=0, fetch=1000\
         \n      TableScan: test, fetch=1000";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -563,8 +392,7 @@ mod test {
         let expected = "Limit: skip=10, fetch=None\
         \n  TableScan: test";
 
-        assert_optimized_plan_eq(&plan, expected);
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -578,13 +406,11 @@ mod test {
 
         // Should push the limit down to table provider
         // When it has a select
-        let expected = "Limit: skip=10, fetch=1000\
-        \n  Projection: test.a\
+        let expected = "Projection: test.a\
+        \n  Limit: skip=10, fetch=1000\
         \n    TableScan: test, fetch=1010";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -597,14 +423,11 @@ mod test {
             .limit(10, None)?
             .build()?;
 
-        let expected = "Limit: skip=10, fetch=None\
-        \n  Limit: skip=0, fetch=1000\
-        \n    Projection: test.a\
-        \n      TableScan: test, fetch=1000";
-
-        assert_optimized_plan_eq(&plan, expected);
+        let expected = "Projection: test.a\
+        \n  Limit: skip=10, fetch=990\
+        \n    TableScan: test, fetch=1000";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -617,14 +440,11 @@ mod test {
             .limit(0, Some(1000))?
             .build()?;
 
-        let expected = "Limit: skip=0, fetch=1000\
+        let expected = "Projection: test.a\
         \n  Limit: skip=10, fetch=1000\
-        \n    Projection: test.a\
-        \n      TableScan: test, fetch=1010";
-
-        assert_optimized_plan_eq(&plan, expected);
+        \n    TableScan: test, fetch=1010";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -637,17 +457,10 @@ mod test {
             .limit(0, Some(10))?
             .build()?;
 
-        // Should push down the smallest limit
-        // Towards table scan
-        // This rule doesn't replace multiple limits
-        let expected = "Limit: skip=0, fetch=10\
-        \n  Limit: skip=0, fetch=10\
-        \n    Limit: skip=10, fetch=10\
-        \n      TableScan: test, fetch=20";
-
-        assert_optimized_plan_eq(&plan, expected);
+        let expected = "Limit: skip=10, fetch=10\
+        \n  TableScan: test, fetch=20";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -664,9 +477,7 @@ mod test {
         \n  Aggregate: groupBy=[[test.a]], aggr=[[MAX(test.b)]]\
         \n    TableScan: test";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -686,9 +497,7 @@ mod test {
         \n    Limit: skip=0, fetch=1010\
         \n      TableScan: test, fetch=1010";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -712,9 +521,7 @@ mod test {
         \n    TableScan: test\
         \n    TableScan: test2";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -738,9 +545,7 @@ mod test {
         \n    TableScan: test\
         \n    TableScan: test2";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -769,9 +574,7 @@ mod test {
         \n    Projection: test2.a\
         \n      TableScan: test2";
 
-        assert_optimized_plan_eq(&outer_query, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&outer_query, expected)
     }
 
     #[test]
@@ -800,9 +603,7 @@ mod test {
         \n    Projection: test2.a\
         \n      TableScan: test2";
 
-        assert_optimized_plan_eq(&outer_query, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&outer_query, expected)
     }
 
     #[test]
@@ -823,12 +624,11 @@ mod test {
         // Limit pushdown Not supported in Join
         let expected = "Limit: skip=0, fetch=1000\
         \n  Left Join: test.a = test2.a\
-        \n    TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
         \n    TableScan: test2";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -849,12 +649,11 @@ mod test {
         // Limit pushdown Not supported in Join
         let expected = "Limit: skip=10, fetch=1000\
         \n  Left Join: test.a = test2.a\
-        \n    TableScan: test, fetch=1010\
+        \n    Limit: skip=0, fetch=1010\
+        \n      TableScan: test, fetch=1010\
         \n    TableScan: test2";
 
-        assert_optimized_plan_eq(&plan, expected);
-
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -876,11 +675,10 @@ mod test {
         let expected = "Limit: skip=0, fetch=1000\
         \n  Right Join: test.a = test2.a\
         \n    TableScan: test\
-        \n    TableScan: test2, fetch=1000";
-
-        assert_optimized_plan_eq(&plan, expected);
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -902,11 +700,10 @@ mod test {
         let expected = "Limit: skip=10, fetch=1000\
         \n  Right Join: test.a = test2.a\
         \n    TableScan: test\
-        \n    TableScan: test2, fetch=1010";
-
-        assert_optimized_plan_eq(&plan, expected);
+        \n    Limit: skip=0, fetch=1010\
+        \n      TableScan: test2, fetch=1010";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -921,12 +718,12 @@ mod test {
 
         let expected = "Limit: skip=0, fetch=1000\
         \n  CrossJoin:\
-        \n    TableScan: test, fetch=1000\
-        \n    TableScan: test2, fetch=1000";
-
-        assert_optimized_plan_eq(&plan, expected);
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test, fetch=1000\
+        \n    Limit: skip=0, fetch=1000\
+        \n      TableScan: test2, fetch=1000";
 
-        Ok(())
+        assert_optimized_plan_eq(&plan, expected)
     }
 
     #[test]
@@ -941,11 +738,41 @@ mod test {
 
         let expected = "Limit: skip=1000, fetch=1000\
         \n  CrossJoin:\
-        \n    TableScan: test, fetch=2000\
-        \n    TableScan: test2, fetch=2000";
+        \n    Limit: skip=0, fetch=2000\
+        \n      TableScan: test, fetch=2000\
+        \n    Limit: skip=0, fetch=2000\
+        \n      TableScan: test2, fetch=2000";
 
-        assert_optimized_plan_eq(&plan, expected);
+        assert_optimized_plan_eq(&plan, expected)
+    }
 
-        Ok(())
+    #[test]
+    fn merge_limit_result_empty() -> Result<()> {
+        let scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(scan)
+            .limit(0, Some(1000))?
+            .limit(1000, None)?
+            .build()?;
+
+        let expected = "Limit: skip=1000, fetch=0\
+        \n  TableScan: test, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)
+    }
+
+    #[test]
+    fn skip_great_than_fetch() -> Result<()> {
+        let scan = test_table_scan()?;
+
+        let plan = LogicalPlanBuilder::from(scan)
+            .limit(0, Some(1))?
+            .limit(1000, None)?
+            .build()?;
+
+        let expected = "Limit: skip=1000, fetch=0\
+        \n  TableScan: test, fetch=1000";
+
+        assert_optimized_plan_eq(&plan, expected)
     }
 }

Reply via email to