This is an automated email from the ASF dual-hosted git repository.

jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new f1671bbcaa Minor: Move `LimitPushdown` tests to be in the same file as 
the code (#14076)
f1671bbcaa is described below

commit f1671bbcaa1ced5393fb0dda6b77fb66d0aa8a6c
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Jan 10 22:09:03 2025 -0500

    Minor: Move `LimitPushdown` tests to be in the same file as the code 
(#14076)
    
    * Minor: move limit_pushdown tests to be with their pass
    
    * Fix clippy
    
    * cleaup use
    
    * fmt
---
 .../tests/physical_optimizer/limit_pushdown.rs     | 490 ---------------------
 datafusion/core/tests/physical_optimizer/mod.rs    |   1 -
 .../physical-optimizer/src/limit_pushdown.rs       | 477 ++++++++++++++++++++
 3 files changed, 477 insertions(+), 491 deletions(-)

diff --git a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
deleted file mode 100644
index 1b4c28d41d..0000000000
--- a/datafusion/core/tests/physical_optimizer/limit_pushdown.rs
+++ /dev/null
@@ -1,490 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use arrow_schema::{DataType, Field, Schema, SchemaRef, SortOptions};
-use datafusion_common::config::ConfigOptions;
-use datafusion_execution::{SendableRecordBatchStream, TaskContext};
-use datafusion_expr::Operator;
-use datafusion_physical_expr::expressions::BinaryExpr;
-use datafusion_physical_expr::expressions::{col, lit};
-use datafusion_physical_expr::Partitioning;
-use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
-use datafusion_physical_optimizer::limit_pushdown::LimitPushdown;
-use datafusion_physical_optimizer::PhysicalOptimizerRule;
-use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
-use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
-use datafusion_physical_plan::empty::EmptyExec;
-use datafusion_physical_plan::filter::FilterExec;
-use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
-use datafusion_physical_plan::projection::ProjectionExec;
-use datafusion_physical_plan::repartition::RepartitionExec;
-use datafusion_physical_plan::sorts::sort::SortExec;
-use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
-use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec};
-use datafusion_physical_plan::{get_plan_string, ExecutionPlan, 
ExecutionPlanProperties};
-use std::sync::Arc;
-
-#[derive(Debug)]
-struct DummyStreamPartition {
-    schema: SchemaRef,
-}
-impl PartitionStream for DummyStreamPartition {
-    fn schema(&self) -> &SchemaRef {
-        &self.schema
-    }
-    fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream {
-        unreachable!()
-    }
-}
-
-#[test]
-fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero(
-) -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema)?;
-    let global_limit = global_limit_exec(streaming_table, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true, fetch=5"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn 
transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_limit_when_skip_is_nonzero(
-) -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema)?;
-    let global_limit = global_limit_exec(streaming_table, 2, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=2, fetch=5",
-        "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "GlobalLimitExec: skip=2, fetch=5",
-        "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true, fetch=7"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn 
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit(
-) -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema.clone())?;
-    let repartition = repartition_exec(streaming_table)?;
-    let filter = filter_exec(schema, repartition)?;
-    let coalesce_batches = coalesce_batches_exec(filter);
-    let local_limit = local_limit_exec(coalesce_batches, 5);
-    let coalesce_partitions = coalesce_partitions_exec(local_limit);
-    let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  CoalescePartitionsExec",
-        "    LocalLimitExec: fetch=5",
-        "      CoalesceBatchesExec: target_batch_size=8192",
-        "        FilterExec: c3@2 > 0",
-        "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "            StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
-    ];
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  CoalescePartitionsExec",
-        "    CoalesceBatchesExec: target_batch_size=8192, fetch=5",
-        "      FilterExec: c3@2 > 0",
-        "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "          StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn pushes_global_limit_exec_through_projection_exec() -> 
datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema.clone())?;
-    let filter = filter_exec(schema.clone(), streaming_table)?;
-    let projection = projection_exec(schema, filter)?;
-    let global_limit = global_limit_exec(projection, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "    FilterExec: c3@2 > 0",
-        "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "  GlobalLimitExec: skip=0, fetch=5",
-        "    FilterExec: c3@2 > 0",
-        "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn 
pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version(
-) -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema.clone()).unwrap();
-    let coalesce_batches = coalesce_batches_exec(streaming_table);
-    let projection = projection_exec(schema, coalesce_batches)?;
-    let global_limit = global_limit_exec(projection, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "    CoalesceBatchesExec: target_batch_size=8192",
-        "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "  CoalesceBatchesExec: target_batch_size=8192, fetch=5",
-        "    StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn pushes_global_limit_into_multiple_fetch_plans() -> 
datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema.clone()).unwrap();
-    let coalesce_batches = coalesce_batches_exec(streaming_table);
-    let projection = projection_exec(schema.clone(), coalesce_batches)?;
-    let repartition = repartition_exec(projection)?;
-    let sort = sort_exec(
-        vec![PhysicalSortExpr {
-            expr: col("c1", &schema)?,
-            options: SortOptions::default(),
-        }],
-        repartition,
-    );
-    let spm = 
sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort);
-    let global_limit = global_limit_exec(spm, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  SortPreservingMergeExec: [c1@0 ASC]",
-        "    SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]",
-        "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "        ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "          CoalesceBatchesExec: target_batch_size=8192",
-        "            StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "SortPreservingMergeExec: [c1@0 ASC], fetch=5",
-        "  SortExec: TopK(fetch=5), expr=[c1@0 ASC], 
preserve_partitioning=[false]",
-        "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "      ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
-        "        CoalesceBatchesExec: target_batch_size=8192",
-        "          StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
-) -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let streaming_table = streaming_table_exec(schema.clone())?;
-    let repartition = repartition_exec(streaming_table)?;
-    let filter = filter_exec(schema, repartition)?;
-    let coalesce_partitions = coalesce_partitions_exec(filter);
-    let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  CoalescePartitionsExec",
-        "    FilterExec: c3@2 > 0",
-        "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "        StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
-    ];
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = [
-        "GlobalLimitExec: skip=0, fetch=5",
-        "  CoalescePartitionsExec",
-        "    FilterExec: c3@2 > 0",
-        "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-        "        StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
-    ];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn merges_local_limit_with_local_limit() -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let empty_exec = empty_exec(schema);
-    let child_local_limit = local_limit_exec(empty_exec, 10);
-    let parent_local_limit = local_limit_exec(child_local_limit, 20);
-
-    let initial = get_plan_string(&parent_local_limit);
-    let expected_initial = [
-        "LocalLimitExec: fetch=20",
-        "  LocalLimitExec: fetch=10",
-        "    EmptyExec",
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(parent_local_limit, 
&ConfigOptions::new())?;
-
-    let expected = ["GlobalLimitExec: skip=0, fetch=10", "  EmptyExec"];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn merges_global_limit_with_global_limit() -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let empty_exec = empty_exec(schema);
-    let child_global_limit = global_limit_exec(empty_exec, 10, Some(30));
-    let parent_global_limit = global_limit_exec(child_global_limit, 10, 
Some(20));
-
-    let initial = get_plan_string(&parent_global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=10, fetch=20",
-        "  GlobalLimitExec: skip=10, fetch=30",
-        "    EmptyExec",
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(parent_global_limit, 
&ConfigOptions::new())?;
-
-    let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn merges_global_limit_with_local_limit() -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let empty_exec = empty_exec(schema);
-    let local_limit = local_limit_exec(empty_exec, 40);
-    let global_limit = global_limit_exec(local_limit, 20, Some(30));
-
-    let initial = get_plan_string(&global_limit);
-    let expected_initial = [
-        "GlobalLimitExec: skip=20, fetch=30",
-        "  LocalLimitExec: fetch=40",
-        "    EmptyExec",
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(global_limit, &ConfigOptions::new())?;
-
-    let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-#[test]
-fn merges_local_limit_with_global_limit() -> datafusion_common::Result<()> {
-    let schema = create_schema();
-    let empty_exec = empty_exec(schema);
-    let global_limit = global_limit_exec(empty_exec, 20, Some(30));
-    let local_limit = local_limit_exec(global_limit, 20);
-
-    let initial = get_plan_string(&local_limit);
-    let expected_initial = [
-        "LocalLimitExec: fetch=20",
-        "  GlobalLimitExec: skip=20, fetch=30",
-        "    EmptyExec",
-    ];
-
-    assert_eq!(initial, expected_initial);
-
-    let after_optimize =
-        LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?;
-
-    let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
-    assert_eq!(get_plan_string(&after_optimize), expected);
-
-    Ok(())
-}
-
-fn create_schema() -> SchemaRef {
-    Arc::new(Schema::new(vec![
-        Field::new("c1", DataType::Int32, true),
-        Field::new("c2", DataType::Int32, true),
-        Field::new("c3", DataType::Int32, true),
-    ]))
-}
-
-fn streaming_table_exec(
-    schema: SchemaRef,
-) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
-    Ok(Arc::new(StreamingTableExec::try_new(
-        schema.clone(),
-        vec![Arc::new(DummyStreamPartition { schema }) as _],
-        None,
-        None,
-        true,
-        None,
-    )?))
-}
-
-fn global_limit_exec(
-    input: Arc<dyn ExecutionPlan>,
-    skip: usize,
-    fetch: Option<usize>,
-) -> Arc<dyn ExecutionPlan> {
-    Arc::new(GlobalLimitExec::new(input, skip, fetch))
-}
-
-fn local_limit_exec(
-    input: Arc<dyn ExecutionPlan>,
-    fetch: usize,
-) -> Arc<dyn ExecutionPlan> {
-    Arc::new(LocalLimitExec::new(input, fetch))
-}
-
-fn sort_exec(
-    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
-    input: Arc<dyn ExecutionPlan>,
-) -> Arc<dyn ExecutionPlan> {
-    let sort_exprs = sort_exprs.into_iter().collect();
-    Arc::new(SortExec::new(sort_exprs, input))
-}
-
-fn sort_preserving_merge_exec(
-    sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
-    input: Arc<dyn ExecutionPlan>,
-) -> Arc<dyn ExecutionPlan> {
-    let sort_exprs = sort_exprs.into_iter().collect();
-    Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
-}
-
-fn projection_exec(
-    schema: SchemaRef,
-    input: Arc<dyn ExecutionPlan>,
-) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
-    Ok(Arc::new(ProjectionExec::try_new(
-        vec![
-            (col("c1", schema.as_ref()).unwrap(), "c1".to_string()),
-            (col("c2", schema.as_ref()).unwrap(), "c2".to_string()),
-            (col("c3", schema.as_ref()).unwrap(), "c3".to_string()),
-        ],
-        input,
-    )?))
-}
-
-fn filter_exec(
-    schema: SchemaRef,
-    input: Arc<dyn ExecutionPlan>,
-) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
-    Ok(Arc::new(FilterExec::try_new(
-        Arc::new(BinaryExpr::new(
-            col("c3", schema.as_ref()).unwrap(),
-            Operator::Gt,
-            lit(0),
-        )),
-        input,
-    )?))
-}
-
-fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
-    Arc::new(CoalesceBatchesExec::new(input, 8192))
-}
-
-fn coalesce_partitions_exec(
-    local_limit: Arc<dyn ExecutionPlan>,
-) -> Arc<dyn ExecutionPlan> {
-    Arc::new(CoalescePartitionsExec::new(local_limit))
-}
-
-fn repartition_exec(
-    streaming_table: Arc<dyn ExecutionPlan>,
-) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
-    Ok(Arc::new(RepartitionExec::try_new(
-        streaming_table,
-        Partitioning::RoundRobinBatch(8),
-    )?))
-}
-
-fn empty_exec(schema: SchemaRef) -> Arc<dyn ExecutionPlan> {
-    Arc::new(EmptyExec::new(schema))
-}
diff --git a/datafusion/core/tests/physical_optimizer/mod.rs 
b/datafusion/core/tests/physical_optimizer/mod.rs
index c06783aa02..efe3778911 100644
--- a/datafusion/core/tests/physical_optimizer/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/mod.rs
@@ -16,6 +16,5 @@
 // under the License.
 
 mod combine_partial_final_agg;
-mod limit_pushdown;
 mod limited_distinct_aggregation;
 mod test_util;
diff --git a/datafusion/physical-optimizer/src/limit_pushdown.rs 
b/datafusion/physical-optimizer/src/limit_pushdown.rs
index 8f392b6830..7a44b2e90d 100644
--- a/datafusion/physical-optimizer/src/limit_pushdown.rs
+++ b/datafusion/physical-optimizer/src/limit_pushdown.rs
@@ -339,3 +339,480 @@ fn add_global_limit(
 }
 
 // See tests in datafusion/core/tests/physical_optimizer
+
+#[cfg(test)]
+mod test {
+    use super::*;
+    use arrow::compute::SortOptions;
+    use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use datafusion_common::config::ConfigOptions;
+    use datafusion_execution::{SendableRecordBatchStream, TaskContext};
+    use datafusion_expr::Operator;
+    use datafusion_physical_expr::expressions::BinaryExpr;
+    use datafusion_physical_expr::expressions::{col, lit};
+    use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
+    use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
+    use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
+    use datafusion_physical_plan::empty::EmptyExec;
+    use datafusion_physical_plan::filter::FilterExec;
+    use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
+    use datafusion_physical_plan::projection::ProjectionExec;
+    use datafusion_physical_plan::repartition::RepartitionExec;
+    use datafusion_physical_plan::sorts::sort::SortExec;
+    use 
datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
+    use datafusion_physical_plan::streaming::{PartitionStream, 
StreamingTableExec};
+    use datafusion_physical_plan::{
+        get_plan_string, ExecutionPlan, ExecutionPlanProperties,
+    };
+    use std::sync::Arc;
+
+    #[derive(Debug)]
+    struct DummyStreamPartition {
+        schema: SchemaRef,
+    }
+    impl PartitionStream for DummyStreamPartition {
+        fn schema(&self) -> &SchemaRef {
+            &self.schema
+        }
+        fn execute(&self, _ctx: Arc<TaskContext>) -> SendableRecordBatchStream 
{
+            unreachable!()
+        }
+    }
+
+    #[test]
+    fn transforms_streaming_table_exec_into_fetching_version_when_skip_is_zero(
+    ) -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = streaming_table_exec(schema)?;
+        let global_limit = global_limit_exec(streaming_table, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
+        ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true, fetch=5"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn 
transforms_streaming_table_exec_into_fetching_version_and_keeps_the_global_limit_when_skip_is_nonzero(
+    ) -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = streaming_table_exec(schema)?;
+        let global_limit = global_limit_exec(streaming_table, 2, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=2, fetch=5",
+            "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true"
+        ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "GlobalLimitExec: skip=2, fetch=5",
+            "  StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], 
infinite_source=true, fetch=7"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn 
transforms_coalesce_batches_exec_into_fetching_version_and_removes_local_limit(
+    ) -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
+        let repartition = repartition_exec(streaming_table)?;
+        let filter = filter_exec(schema, repartition)?;
+        let coalesce_batches = coalesce_batches_exec(filter);
+        let local_limit = local_limit_exec(coalesce_batches, 5);
+        let coalesce_partitions = coalesce_partitions_exec(local_limit);
+        let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  CoalescePartitionsExec",
+            "    LocalLimitExec: fetch=5",
+            "      CoalesceBatchesExec: target_batch_size=8192",
+            "        FilterExec: c3@2 > 0",
+            "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "            StreamingTableExec: partition_sizes=1, 
projection=[c1, c2, c3], infinite_source=true"
+        ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  CoalescePartitionsExec",
+            "    CoalesceBatchesExec: target_batch_size=8192, fetch=5",
+            "      FilterExec: c3@2 > 0",
+            "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "          StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn pushes_global_limit_exec_through_projection_exec() -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
+        let filter = filter_exec(Arc::clone(&schema), streaming_table)?;
+        let projection = projection_exec(schema, filter)?;
+        let global_limit = global_limit_exec(projection, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
+            "    FilterExec: c3@2 > 0",
+            "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
+        ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
+            "  GlobalLimitExec: skip=0, fetch=5",
+            "    FilterExec: c3@2 > 0",
+            "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn 
pushes_global_limit_exec_through_projection_exec_and_transforms_coalesce_batches_exec_into_fetching_version(
+    ) -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = 
streaming_table_exec(Arc::clone(&schema)).unwrap();
+        let coalesce_batches = coalesce_batches_exec(streaming_table);
+        let projection = projection_exec(schema, coalesce_batches)?;
+        let global_limit = global_limit_exec(projection, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
+            "    CoalesceBatchesExec: target_batch_size=8192",
+            "      StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
+            "  CoalesceBatchesExec: target_batch_size=8192, fetch=5",
+            "    StreamingTableExec: partition_sizes=1, projection=[c1, c2, 
c3], infinite_source=true"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn pushes_global_limit_into_multiple_fetch_plans() -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = 
streaming_table_exec(Arc::clone(&schema)).unwrap();
+        let coalesce_batches = coalesce_batches_exec(streaming_table);
+        let projection = projection_exec(Arc::clone(&schema), 
coalesce_batches)?;
+        let repartition = repartition_exec(projection)?;
+        let sort = sort_exec(
+            vec![PhysicalSortExpr {
+                expr: col("c1", &schema)?,
+                options: SortOptions::default(),
+            }],
+            repartition,
+        );
+        let spm =
+            
sort_preserving_merge_exec(sort.output_ordering().unwrap().to_vec(), sort);
+        let global_limit = global_limit_exec(spm, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  SortPreservingMergeExec: [c1@0 ASC]",
+            "    SortExec: expr=[c1@0 ASC], preserve_partitioning=[false]",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as 
c3]",
+            "          CoalesceBatchesExec: target_batch_size=8192",
+            "            StreamingTableExec: partition_sizes=1, 
projection=[c1, c2, c3], infinite_source=true"
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "SortPreservingMergeExec: [c1@0 ASC], fetch=5",
+            "  SortExec: TopK(fetch=5), expr=[c1@0 ASC], 
preserve_partitioning=[false]",
+            "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "      ProjectionExec: expr=[c1@0 as c1, c2@1 as c2, c3@2 as c3]",
+            "        CoalesceBatchesExec: target_batch_size=8192",
+            "          StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn keeps_pushed_local_limit_exec_when_there_are_multiple_input_partitions(
+    ) -> Result<()> {
+        let schema = create_schema();
+        let streaming_table = streaming_table_exec(Arc::clone(&schema))?;
+        let repartition = repartition_exec(streaming_table)?;
+        let filter = filter_exec(schema, repartition)?;
+        let coalesce_partitions = coalesce_partitions_exec(filter);
+        let global_limit = global_limit_exec(coalesce_partitions, 0, Some(5));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  CoalescePartitionsExec",
+            "    FilterExec: c3@2 > 0",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+        ];
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = [
+            "GlobalLimitExec: skip=0, fetch=5",
+            "  CoalescePartitionsExec",
+            "    FilterExec: c3@2 > 0",
+            "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
+            "        StreamingTableExec: partition_sizes=1, projection=[c1, 
c2, c3], infinite_source=true"
+        ];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merges_local_limit_with_local_limit() -> Result<()> {
+        let schema = create_schema();
+        let empty_exec = empty_exec(schema);
+        let child_local_limit = local_limit_exec(empty_exec, 10);
+        let parent_local_limit = local_limit_exec(child_local_limit, 20);
+
+        let initial = get_plan_string(&parent_local_limit);
+        let expected_initial = [
+            "LocalLimitExec: fetch=20",
+            "  LocalLimitExec: fetch=10",
+            "    EmptyExec",
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(parent_local_limit, 
&ConfigOptions::new())?;
+
+        let expected = ["GlobalLimitExec: skip=0, fetch=10", "  EmptyExec"];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merges_global_limit_with_global_limit() -> Result<()> {
+        let schema = create_schema();
+        let empty_exec = empty_exec(schema);
+        let child_global_limit = global_limit_exec(empty_exec, 10, Some(30));
+        let parent_global_limit = global_limit_exec(child_global_limit, 10, 
Some(20));
+
+        let initial = get_plan_string(&parent_global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=10, fetch=20",
+            "  GlobalLimitExec: skip=10, fetch=30",
+            "    EmptyExec",
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(parent_global_limit, 
&ConfigOptions::new())?;
+
+        let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merges_global_limit_with_local_limit() -> Result<()> {
+        let schema = create_schema();
+        let empty_exec = empty_exec(schema);
+        let local_limit = local_limit_exec(empty_exec, 40);
+        let global_limit = global_limit_exec(local_limit, 20, Some(30));
+
+        let initial = get_plan_string(&global_limit);
+        let expected_initial = [
+            "GlobalLimitExec: skip=20, fetch=30",
+            "  LocalLimitExec: fetch=40",
+            "    EmptyExec",
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(global_limit, 
&ConfigOptions::new())?;
+
+        let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    #[test]
+    fn merges_local_limit_with_global_limit() -> Result<()> {
+        let schema = create_schema();
+        let empty_exec = empty_exec(schema);
+        let global_limit = global_limit_exec(empty_exec, 20, Some(30));
+        let local_limit = local_limit_exec(global_limit, 20);
+
+        let initial = get_plan_string(&local_limit);
+        let expected_initial = [
+            "LocalLimitExec: fetch=20",
+            "  GlobalLimitExec: skip=20, fetch=30",
+            "    EmptyExec",
+        ];
+
+        assert_eq!(initial, expected_initial);
+
+        let after_optimize =
+            LimitPushdown::new().optimize(local_limit, &ConfigOptions::new())?;
+
+        let expected = ["GlobalLimitExec: skip=20, fetch=20", "  EmptyExec"];
+        assert_eq!(get_plan_string(&after_optimize), expected);
+
+        Ok(())
+    }
+
+    fn create_schema() -> SchemaRef {
+        Arc::new(Schema::new(vec![
+            Field::new("c1", DataType::Int32, true),
+            Field::new("c2", DataType::Int32, true),
+            Field::new("c3", DataType::Int32, true),
+        ]))
+    }
+
+    fn streaming_table_exec(schema: SchemaRef) -> Result<Arc<dyn 
ExecutionPlan>> {
+        Ok(Arc::new(StreamingTableExec::try_new(
+            Arc::clone(&schema),
+            vec![Arc::new(DummyStreamPartition { schema }) as _],
+            None,
+            None,
+            true,
+            None,
+        )?))
+    }
+
+    fn global_limit_exec(
+        input: Arc<dyn ExecutionPlan>,
+        skip: usize,
+        fetch: Option<usize>,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(GlobalLimitExec::new(input, skip, fetch))
+    }
+
+    fn local_limit_exec(
+        input: Arc<dyn ExecutionPlan>,
+        fetch: usize,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(LocalLimitExec::new(input, fetch))
+    }
+
+    fn sort_exec(
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        Arc::new(SortExec::new(sort_exprs, input))
+    }
+
+    fn sort_preserving_merge_exec(
+        sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        let sort_exprs = sort_exprs.into_iter().collect();
+        Arc::new(SortPreservingMergeExec::new(sort_exprs, input))
+    }
+
+    fn projection_exec(
+        schema: SchemaRef,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(ProjectionExec::try_new(
+            vec![
+                (col("c1", schema.as_ref()).unwrap(), "c1".to_string()),
+                (col("c2", schema.as_ref()).unwrap(), "c2".to_string()),
+                (col("c3", schema.as_ref()).unwrap(), "c3".to_string()),
+            ],
+            input,
+        )?))
+    }
+
+    fn filter_exec(
+        schema: SchemaRef,
+        input: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(FilterExec::try_new(
+            Arc::new(BinaryExpr::new(
+                col("c3", schema.as_ref()).unwrap(),
+                Operator::Gt,
+                lit(0),
+            )),
+            input,
+        )?))
+    }
+
+    fn coalesce_batches_exec(input: Arc<dyn ExecutionPlan>) -> Arc<dyn 
ExecutionPlan> {
+        Arc::new(CoalesceBatchesExec::new(input, 8192))
+    }
+
+    fn coalesce_partitions_exec(
+        local_limit: Arc<dyn ExecutionPlan>,
+    ) -> Arc<dyn ExecutionPlan> {
+        Arc::new(CoalescePartitionsExec::new(local_limit))
+    }
+
+    fn repartition_exec(
+        streaming_table: Arc<dyn ExecutionPlan>,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        Ok(Arc::new(RepartitionExec::try_new(
+            streaming_table,
+            Partitioning::RoundRobinBatch(8),
+        )?))
+    }
+
+    fn empty_exec(schema: SchemaRef) -> Arc<dyn ExecutionPlan> {
+        Arc::new(EmptyExec::new(schema))
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to