alamb commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1284769893


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +251,27 @@ impl ExternalSorter {
     /// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
     ///
     /// Updates memory usage metrics, and possibly triggers spilling to disk
-    async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+    async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
         if input.num_rows() == 0 {
             return Ok(());
         }
 
+        let mut batch_sorted = false;
+        if self.fetch.map_or(false, |f| f < input.num_rows()) {
+            // Eagerly sort the batch to potentially reduce the number of rows
+            // after applying the fetch parameter; first perform a memory 
reservation
+            // for the sorting procedure.
+            let mut reservation =

Review Comment:
   I don't think we need a new consumer here for each batch insertion -- we 
could just update the main reservation on `self.reservation`
   
   So something like do the sort and then update the size on limit
   ```
               input = sort_batch(&input, &self.expr, self.fetch)?;
               reservation.try_grow(input.get_array_memory_size());
   ```
   
   
   Note the accounting is reworked in 
https://github.com/apache/arrow-datafusion/pull/7130 



##########
datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs:
##########
@@ -30,76 +30,85 @@ use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
 use rand::Rng;
+use rstest::rstest;
 use std::sync::Arc;
 use test_utils::{batches_to_vec, partitions_to_sorted_vec};
 
-#[tokio::test]
-#[cfg_attr(tarpaulin, ignore)]
-async fn test_sort_1k_mem() {
-    run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await
-}
-
-#[tokio::test]
-#[cfg_attr(tarpaulin, ignore)]
-async fn test_sort_100k_mem() {
-    run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await
-}
-
-#[tokio::test]
-async fn test_sort_unlimited_mem() {
-    run_sort(
-        usize::MAX,
-        vec![(5, false), (2000, false), (1000000, false)],
-    )
-    .await
-}
-
 /// Sort the input using SortExec and ensure the results are correct according 
to `Vec::sort`
-async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
-    for (size, spill) in size_spill {
-        let input = vec![make_staggered_batches(size)];
-        let first_batch = input
-            .iter()
-            .flat_map(|p| p.iter())
-            .next()
-            .expect("at least one batch");
-        let schema = first_batch.schema();
-
-        let sort = vec![PhysicalSortExpr {
-            expr: col("x", &schema).unwrap(),
-            options: SortOptions {
-                descending: false,
-                nulls_first: true,
-            },
-        }];
-
-        let exec = MemoryExec::try_new(&input, schema, None).unwrap();
-        let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
-
-        let runtime_config = RuntimeConfig::new()
-            .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
-        let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
-        let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), 
runtime);
-
-        let task_ctx = session_ctx.task_ctx();
-        let collected = collect(sort.clone(), task_ctx).await.unwrap();
-
-        let expected = partitions_to_sorted_vec(&input);
-        let actual = batches_to_vec(&collected);
-
-        if spill {
-            assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
-        } else {
-            assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
-        }
+#[rstest]

Review Comment:
   ❤️ 



##########
datafusion/core/tests/memory_limit.rs:
##########
@@ -45,17 +47,38 @@ fn init() {
     let _ = env_logger::try_init();
 }
 
+#[rstest]
+#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate 
additional", "ExternalSorter"], 100_000)]
+#[case::cant_spill_to_disk(vec!["Resources exhausted: Memory Exhausted while 
Sorting (DiskManager is disabled)"], 200_000)]
+#[case::no_oom(vec![], 600_000)]
 #[tokio::test]
-async fn oom_sort() {
+async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) 
{
     TestCase::new(
         "select * from t order by host DESC",
-        vec![
-            "Resources exhausted: Memory Exhausted while Sorting (DiskManager 
is disabled)",
-        ],
-        200_000,
+        expected_errors,
+        memory_limit,
+    )
+    .run()
+    .await
+}
+
+// We expect to see lower memory thresholds in general when applying a `LIMIT` 
clause due to eager sorting

Review Comment:
   this is nice



##########
datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs:
##########
@@ -30,76 +30,85 @@ use datafusion::physical_plan::sorts::sort::SortExec;
 use datafusion::physical_plan::{collect, ExecutionPlan};
 use datafusion::prelude::{SessionConfig, SessionContext};
 use rand::Rng;
+use rstest::rstest;
 use std::sync::Arc;
 use test_utils::{batches_to_vec, partitions_to_sorted_vec};
 
-#[tokio::test]
-#[cfg_attr(tarpaulin, ignore)]
-async fn test_sort_1k_mem() {
-    run_sort(10240, vec![(5, false), (20000, true), (1000000, true)]).await
-}
-
-#[tokio::test]
-#[cfg_attr(tarpaulin, ignore)]
-async fn test_sort_100k_mem() {
-    run_sort(102400, vec![(5, false), (20000, false), (1000000, true)]).await
-}
-
-#[tokio::test]
-async fn test_sort_unlimited_mem() {
-    run_sort(
-        usize::MAX,
-        vec![(5, false), (2000, false), (1000000, false)],
-    )
-    .await
-}
-
 /// Sort the input using SortExec and ensure the results are correct according 
to `Vec::sort`
-async fn run_sort(pool_size: usize, size_spill: Vec<(usize, bool)>) {
-    for (size, spill) in size_spill {
-        let input = vec![make_staggered_batches(size)];
-        let first_batch = input
-            .iter()
-            .flat_map(|p| p.iter())
-            .next()
-            .expect("at least one batch");
-        let schema = first_batch.schema();
-
-        let sort = vec![PhysicalSortExpr {
-            expr: col("x", &schema).unwrap(),
-            options: SortOptions {
-                descending: false,
-                nulls_first: true,
-            },
-        }];
-
-        let exec = MemoryExec::try_new(&input, schema, None).unwrap();
-        let sort = Arc::new(SortExec::new(sort, Arc::new(exec)));
-
-        let runtime_config = RuntimeConfig::new()
-            .with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size)));
-        let runtime = Arc::new(RuntimeEnv::new(runtime_config).unwrap());
-        let session_ctx = SessionContext::with_config_rt(SessionConfig::new(), 
runtime);
-
-        let task_ctx = session_ctx.task_ctx();
-        let collected = collect(sort.clone(), task_ctx).await.unwrap();
-
-        let expected = partitions_to_sorted_vec(&input);
-        let actual = batches_to_vec(&collected);
-
-        if spill {
-            assert_ne!(sort.metrics().unwrap().spill_count().unwrap(), 0);
-        } else {
-            assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
-        }
+#[rstest]

Review Comment:
   ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to