This is an automated email from the ASF dual-hosted git repository.

berkay pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 295ffb41f5 test: Add plan execution during tests for bounded source 
(#14013)
295ffb41f5 is described below

commit 295ffb41f562f2c58b22ce36928df3f85f7fe09a
Author: Aleksey Kirilishin <[email protected]>
AuthorDate: Fri Jan 10 13:19:23 2025 +0300

    test: Add plan execution during tests for bounded source (#14013)
---
 .../replace_with_order_preserving_variants.rs      | 251 +++++++++++++--------
 1 file changed, 151 insertions(+), 100 deletions(-)

diff --git 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
index 96b2454fa3..9f5afc7abc 100644
--- 
a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
+++ 
b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs
@@ -285,9 +285,7 @@ pub(crate) fn replace_with_order_preserving_variants(
 mod tests {
     use super::*;
 
-    use 
crate::datasource::file_format::file_compression_type::FileCompressionType;
-    use crate::datasource::listing::PartitionedFile;
-    use crate::datasource::physical_plan::{CsvExec, FileScanConfig};
+    use crate::execution::TaskContext;
     use crate::physical_optimizer::test_utils::check_integrity;
     use crate::physical_plan::coalesce_batches::CoalesceBatchesExec;
     use crate::physical_plan::filter::FilterExec;
@@ -296,18 +294,24 @@ mod tests {
     use crate::physical_plan::{
         displayable, get_plan_string, ExecutionPlan, Partitioning,
     };
-    use crate::prelude::SessionConfig;
+    use crate::prelude::{SessionConfig, SessionContext};
     use crate::test::TestStreamPartition;
 
+    use arrow::array::{ArrayRef, Int32Array};
     use arrow::compute::SortOptions;
     use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+    use arrow::record_batch::RecordBatch;
     use datafusion_common::tree_node::{TransformedResult, TreeNode};
     use datafusion_common::Result;
-    use datafusion_execution::object_store::ObjectStoreUrl;
     use datafusion_expr::{JoinType, Operator};
     use datafusion_physical_expr::expressions::{self, col, Column};
     use datafusion_physical_expr::PhysicalSortExpr;
+    use datafusion_physical_plan::collect;
+    use datafusion_physical_plan::memory::MemoryExec;
     use datafusion_physical_plan::streaming::StreamingTableExec;
+    use object_store::memory::InMemory;
+    use object_store::ObjectStore;
+    use url::Url;
 
     use rstest::rstest;
 
@@ -328,20 +332,24 @@ mod tests {
     /// * `$PLAN`: The plan to optimize.
     /// * `$SOURCE_UNBOUNDED`: Whether the given plan contains an unbounded 
source.
     macro_rules! assert_optimized_in_all_boundedness_situations {
-        ($EXPECTED_UNBOUNDED_PLAN_LINES: expr,  $EXPECTED_BOUNDED_PLAN_LINES: 
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, 
$SOURCE_UNBOUNDED: expr) => {
+        ($EXPECTED_UNBOUNDED_PLAN_LINES: expr,  $EXPECTED_BOUNDED_PLAN_LINES: 
expr, $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, 
$SOURCE_UNBOUNDED: expr, $PREFER_EXISTING_SORT: expr) => {
             if $SOURCE_UNBOUNDED {
                 assert_optimized_prefer_sort_on_off!(
                     $EXPECTED_UNBOUNDED_PLAN_LINES,
                     $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
                     $EXPECTED_UNBOUNDED_OPTIMIZED_PLAN_LINES,
-                    $PLAN
+                    $PLAN,
+                    $PREFER_EXISTING_SORT,
+                    $SOURCE_UNBOUNDED
                 );
             } else {
                 assert_optimized_prefer_sort_on_off!(
                     $EXPECTED_BOUNDED_PLAN_LINES,
                     $EXPECTED_BOUNDED_OPTIMIZED_PLAN_LINES,
                     $EXPECTED_BOUNDED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
-                    $PLAN
+                    $PLAN,
+                    $PREFER_EXISTING_SORT,
+                    $SOURCE_UNBOUNDED
                 );
             }
         };
@@ -359,19 +367,24 @@ mod tests {
     ///   the flag `prefer_existing_sort` is `true`.
     /// * `$PLAN`: The plan to optimize.
     macro_rules! assert_optimized_prefer_sort_on_off {
-        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr) => {
-            assert_optimized!(
-                $EXPECTED_PLAN_LINES,
-                $EXPECTED_OPTIMIZED_PLAN_LINES,
-                $PLAN.clone(),
-                false
-            );
-            assert_optimized!(
-                $EXPECTED_PLAN_LINES,
-                $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
-                $PLAN,
-                true
-            );
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES: expr, $PLAN: expr, 
$PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
+            if $PREFER_EXISTING_SORT {
+                assert_optimized!(
+                    $EXPECTED_PLAN_LINES,
+                    $EXPECTED_PREFER_SORT_ON_OPTIMIZED_PLAN_LINES,
+                    $PLAN,
+                    $PREFER_EXISTING_SORT,
+                    $SOURCE_UNBOUNDED
+                );
+            } else {
+                assert_optimized!(
+                    $EXPECTED_PLAN_LINES,
+                    $EXPECTED_OPTIMIZED_PLAN_LINES,
+                    $PLAN,
+                    $PREFER_EXISTING_SORT,
+                    $SOURCE_UNBOUNDED
+                );
+            }
         };
     }
 
@@ -385,7 +398,7 @@ mod tests {
     /// * `$PLAN`: The plan to optimize.
     /// * `$PREFER_EXISTING_SORT`: Value of the `prefer_existing_sort` flag.
     macro_rules! assert_optimized {
-        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr, $PREFER_EXISTING_SORT: expr) => {
+        ($EXPECTED_PLAN_LINES: expr, $EXPECTED_OPTIMIZED_PLAN_LINES: expr, 
$PLAN: expr, $PREFER_EXISTING_SORT: expr, $SOURCE_UNBOUNDED: expr) => {
             let physical_plan = $PLAN;
             let formatted = 
displayable(physical_plan.as_ref()).indent(true).to_string();
             let actual: Vec<&str> = formatted.trim().lines().collect();
@@ -412,6 +425,19 @@ mod tests {
                 expected_optimized_lines, actual,
                 "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected_optimized_lines:#?}\nactual:\n\n{actual:#?}\n\n"
             );
+
+            if !$SOURCE_UNBOUNDED {
+                let ctx = SessionContext::new();
+                let object_store = InMemory::new();
+                object_store.put(&object_store::path::Path::from("file_path"), 
bytes::Bytes::from("").into()).await?;
+                ctx.register_object_store(&Url::parse("test://").unwrap(), 
Arc::new(object_store));
+                let task_ctx = Arc::new(TaskContext::from(&ctx));
+                let res = collect(optimized_physical_plan, task_ctx).await;
+                assert!(
+                    res.is_ok(),
+                    "Some errors occurred while executing the optimized 
physical plan: {:?}", res.unwrap_err()
+                );
+            }
         };
     }
 
@@ -420,13 +446,14 @@ mod tests {
     // Searches for a simple sort and a repartition just after it, the second 
repartition with 1 input partition should not be affected
     async fn test_replace_multiple_input_repartition_1(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition = 
repartition_exec_hash(repartition_exec_round_robin(source));
         let sort = sort_exec(vec![sort_expr("a", &schema)], repartition, true);
@@ -447,7 +474,7 @@ mod tests {
             "  SortExec: expr=[a@0 ASC NULLS LAST], 
preserve_partitioning=[true]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -464,13 +491,13 @@ mod tests {
             "  SortExec: expr=[a@0 ASC NULLS LAST], 
preserve_partitioning=[true]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -479,7 +506,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -488,13 +516,14 @@ mod tests {
     #[tokio::test]
     async fn test_with_inter_children_change_only(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr_default("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -538,7 +567,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+            "                  MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -564,7 +593,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+            "                  MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC]",
@@ -574,7 +603,7 @@ mod tests {
             "        SortPreservingMergeExec: [a@0 ASC]",
             "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC], has_header=true",
+            "              MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -583,7 +612,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -592,13 +622,14 @@ mod tests {
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_2(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let filter = filter_exec(repartition_rr);
@@ -623,7 +654,7 @@ mod tests {
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -642,14 +673,14 @@ mod tests {
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -658,7 +689,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -667,13 +699,14 @@ mod tests {
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_with_extra_steps(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -701,7 +734,7 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -722,7 +755,7 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -730,7 +763,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -739,7 +772,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -748,13 +782,14 @@ mod tests {
     #[tokio::test]
     async fn test_replace_multiple_input_repartition_with_extra_steps_2(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let coalesce_batches_exec_1 = coalesce_batches_exec(repartition_rr);
@@ -786,7 +821,7 @@ mod tests {
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -809,7 +844,7 @@ mod tests {
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          CoalesceBatchesExec: target_batch_size=8192",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -818,7 +853,7 @@ mod tests {
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "        CoalesceBatchesExec: target_batch_size=8192",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -827,7 +862,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -836,13 +872,14 @@ mod tests {
     #[tokio::test]
     async fn test_not_replacing_when_no_need_to_preserve_sorting(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -867,7 +904,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -887,7 +924,7 @@ mod tests {
             "    FilterExec: c@1 > 3",
             "      RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "        RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "          CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "          MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = 
expected_optimized_bounded;
 
@@ -898,7 +935,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -907,13 +945,14 @@ mod tests {
     #[tokio::test]
     async fn test_with_multiple_replacable_repartitions(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -944,7 +983,7 @@ mod tests {
             "        FilterExec: c@1 > 3",
             "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -967,7 +1006,7 @@ mod tests {
             "        FilterExec: c@1 > 3",
             "          RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "            RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "              CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "              MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
@@ -976,7 +1015,7 @@ mod tests {
             "      FilterExec: c@1 > 3",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -985,7 +1024,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -994,13 +1034,14 @@ mod tests {
     #[tokio::test]
     async fn test_not_replace_with_different_orderings(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1028,7 +1069,7 @@ mod tests {
             "  SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -1046,7 +1087,7 @@ mod tests {
             "  SortExec: expr=[c@1 ASC], preserve_partitioning=[true]",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = 
expected_optimized_bounded;
 
@@ -1057,7 +1098,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -1066,13 +1108,14 @@ mod tests {
     #[tokio::test]
     async fn test_with_lost_ordering(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1093,7 +1136,7 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -1110,13 +1153,13 @@ mod tests {
             "  CoalescePartitionsExec",
             "    RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "      RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "        CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "        MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [a@0 ASC NULLS LAST]",
             "  RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8, preserve_order=true, sort_exprs=a@0 ASC NULLS LAST",
             "    RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "      CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "      MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -1125,7 +1168,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -1134,13 +1178,14 @@ mod tests {
     #[tokio::test]
     async fn test_with_lost_and_kept_ordering(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
         let sort_exprs = vec![sort_expr("a", &schema)];
         let source = if source_unbounded {
             stream_exec_ordered(&schema, sort_exprs)
         } else {
-            csv_exec_sorted(&schema, sort_exprs)
+            memory_exec_sorted(&schema, sort_exprs)
         };
         let repartition_rr = repartition_exec_round_robin(source);
         let repartition_hash = repartition_exec_hash(repartition_rr);
@@ -1184,7 +1229,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                  MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -1211,7 +1256,7 @@ mod tests {
             "            CoalescePartitionsExec",
             "              RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "                RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                  CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                  MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = [
             "SortPreservingMergeExec: [c@1 ASC]",
@@ -1222,7 +1267,7 @@ mod tests {
             "          CoalescePartitionsExec",
             "            RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "              RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "                CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "                MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         assert_optimized_in_all_boundedness_situations!(
             expected_input_unbounded,
@@ -1231,7 +1276,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -1240,6 +1286,7 @@ mod tests {
     #[tokio::test]
     async fn test_with_multiple_child_trees(
         #[values(false, true)] source_unbounded: bool,
+        #[values(false, true)] prefer_existing_sort: bool,
     ) -> Result<()> {
         let schema = create_test_schema()?;
 
@@ -1247,7 +1294,7 @@ mod tests {
         let left_source = if source_unbounded {
             stream_exec_ordered(&schema, left_sort_exprs)
         } else {
-            csv_exec_sorted(&schema, left_sort_exprs)
+            memory_exec_sorted(&schema, left_sort_exprs)
         };
         let left_repartition_rr = repartition_exec_round_robin(left_source);
         let left_repartition_hash = repartition_exec_hash(left_repartition_rr);
@@ -1258,7 +1305,7 @@ mod tests {
         let right_source = if source_unbounded {
             stream_exec_ordered(&schema, right_sort_exprs)
         } else {
-            csv_exec_sorted(&schema, right_sort_exprs)
+            memory_exec_sorted(&schema, right_sort_exprs)
         };
         let right_repartition_rr = repartition_exec_round_robin(right_source);
         let right_repartition_hash = 
repartition_exec_hash(right_repartition_rr);
@@ -1299,11 +1346,11 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
 
         // Expected unbounded result (same for with and without flag)
@@ -1330,11 +1377,11 @@ mod tests {
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
             "      CoalesceBatchesExec: target_batch_size=4096",
             "        RepartitionExec: partitioning=Hash([c@1], 8), 
input_partitions=8",
             "          RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1",
-            "            CsvExec: file_groups={1 group: [[file_path]]}, 
projection=[a, c, d], output_ordering=[a@0 ASC NULLS LAST], has_header=true",
+            "            MemoryExec: partitions=1, partition_sizes=[1], 
output_ordering=a@0 ASC NULLS LAST",
         ];
         let expected_optimized_bounded_sort_preserve = 
expected_optimized_bounded;
 
@@ -1345,7 +1392,8 @@ mod tests {
             expected_optimized_bounded,
             expected_optimized_bounded_sort_preserve,
             physical_plan,
-            source_unbounded
+            source_unbounded,
+            prefer_existing_sort
         );
         Ok(())
     }
@@ -1492,33 +1540,36 @@ mod tests {
         )
     }
 
-    // creates a csv exec source for the test purposes
-    // projection and has_header parameters are given static due to testing 
needs
-    fn csv_exec_sorted(
+    // creates a memory exec source for the test purposes
+    // projection parameter is given static due to testing needs
+    fn memory_exec_sorted(
         schema: &SchemaRef,
         sort_exprs: impl IntoIterator<Item = PhysicalSortExpr>,
     ) -> Arc<dyn ExecutionPlan> {
-        let sort_exprs = sort_exprs.into_iter().collect();
-        let projection: Vec<usize> = vec![0, 2, 3];
+        pub fn make_partition(schema: &SchemaRef, sz: i32) -> RecordBatch {
+            let values = (0..sz).collect::<Vec<_>>();
+            let arr = Arc::new(Int32Array::from(values));
+            let arr = arr as ArrayRef;
 
-        Arc::new(
-            CsvExec::builder(
-                FileScanConfig::new(
-                    ObjectStoreUrl::parse("test:///").unwrap(),
-                    schema.clone(),
-                )
-                .with_file(PartitionedFile::new("file_path".to_string(), 100))
-                .with_projection(Some(projection))
-                .with_output_ordering(vec![sort_exprs]),
+            RecordBatch::try_new(
+                schema.clone(),
+                vec![arr.clone(), arr.clone(), arr.clone(), arr],
             )
-            .with_has_header(true)
-            .with_delimeter(0)
-            .with_quote(b'"')
-            .with_escape(None)
-            .with_comment(None)
-            .with_newlines_in_values(false)
-            .with_file_compression_type(FileCompressionType::UNCOMPRESSED)
-            .build(),
-        )
+            .unwrap()
+        }
+
+        let rows = 5;
+        let partitions = 1;
+        let sort_exprs = sort_exprs.into_iter().collect();
+        Arc::new({
+            let data: Vec<Vec<_>> = (0..partitions)
+                .map(|_| vec![make_partition(schema, rows)])
+                .collect();
+            let projection: Vec<usize> = vec![0, 2, 3];
+            MemoryExec::try_new(&data, schema.clone(), Some(projection))
+                .unwrap()
+                .try_with_sort_information(vec![sort_exprs])
+                .unwrap()
+        })
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to