This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 99c811a3bf Fix: External sort failing on `StringView` due to shared 
buffers (#14823)
99c811a3bf is described below

commit 99c811a3bf994437122a71c31315a2e7471b58e8
Author: Yongting You <[email protected]>
AuthorDate: Wed Feb 26 23:46:51 2025 +0800

    Fix: External sort failing on `StringView` due to shared buffers (#14823)
    
    * organize stringview arrays before spilling
    
    * add unit test
    
    * fix
    
    * add comment
    
    * review
    
    * Update datafusion/physical-plan/src/sorts/sort.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/core/tests/memory_limit/mod.rs  |  69 ++++++++++++++-
 datafusion/physical-plan/src/sorts/sort.rs | 131 ++++++++++++++++++++---------
 2 files changed, 156 insertions(+), 44 deletions(-)

diff --git a/datafusion/core/tests/memory_limit/mod.rs 
b/datafusion/core/tests/memory_limit/mod.rs
index a1985a1aa4..2deb8fde2d 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -23,9 +23,10 @@ use std::sync::{Arc, LazyLock};
 
 #[cfg(feature = "extended_tests")]
 mod memory_limit_validation;
-use arrow::array::{ArrayRef, DictionaryArray, RecordBatch};
+use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch, 
StringViewArray};
 use arrow::compute::SortOptions;
 use arrow::datatypes::{Int32Type, SchemaRef};
+use arrow_schema::{DataType, Field, Schema};
 use datafusion::assert_batches_eq;
 use datafusion::datasource::memory::MemorySourceConfig;
 use datafusion::datasource::source::DataSourceExec;
@@ -41,7 +42,7 @@ use datafusion_catalog::streaming::StreamingTable;
 use datafusion_catalog::Session;
 use datafusion_common::{assert_contains, Result};
 use datafusion_execution::memory_pool::{
-    GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+    FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
 };
 use datafusion_execution::TaskContext;
 use datafusion_expr::{Expr, TableType};
@@ -49,6 +50,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
 use datafusion_physical_optimizer::join_selection::JoinSelection;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
 use datafusion_physical_plan::spill::get_record_batch_memory_size;
+use rand::Rng;
 use test_utils::AccessLogGenerator;
 
 use async_trait::async_trait;
@@ -403,6 +405,69 @@ async fn oom_with_tracked_consumer_pool() {
         .await
 }
 
+/// For regression case: if spilled `StringViewArray`'s buffer will be 
referenced by
+/// other batches which are also need to be spilled, then the spill writer will
+/// repeatedly write out the same buffer, and after reading back, each batch's 
size
+/// will explode.
+///
+/// This test setup will cause 10 spills, each spill will sort around 20 
batches.
+/// If there is memory explosion for spilled record batch, this test will fail.
+#[tokio::test]
+async fn test_stringview_external_sort() {
+    let mut rng = rand::thread_rng();
+    let array_length = 1000;
+    let num_batches = 200;
+    // Batches contain two columns: random 100-byte string, and random i32
+    let mut batches = Vec::with_capacity(num_batches);
+
+    for _ in 0..num_batches {
+        let strings: Vec<String> = (0..array_length)
+            .map(|_| {
+                (0..100)
+                    .map(|_| rng.gen_range(0..=u8::MAX) as char)
+                    .collect()
+            })
+            .collect();
+
+        let string_array = StringViewArray::from(strings);
+        let array_ref: ArrayRef = Arc::new(string_array);
+
+        let random_numbers: Vec<i32> =
+            (0..array_length).map(|_| rng.gen_range(0..=1000)).collect();
+        let int_array = Int32Array::from(random_numbers);
+        let int_array_ref: ArrayRef = Arc::new(int_array);
+
+        let batch = RecordBatch::try_new(
+            Arc::new(Schema::new(vec![
+                Field::new("strings", DataType::Utf8View, false),
+                Field::new("random_numbers", DataType::Int32, false),
+            ])),
+            vec![array_ref, int_array_ref],
+        )
+        .unwrap();
+        batches.push(batch);
+    }
+
+    // Run a sql query that sorts the batches by the int column
+    let schema = batches[0].schema();
+    let table = MemTable::try_new(schema, vec![batches]).unwrap();
+    let builder = RuntimeEnvBuilder::new()
+        .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024)));
+    let runtime = builder.build_arc().unwrap();
+
+    let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 * 
1024 * 1024);
+
+    let ctx = SessionContext::new_with_config_rt(config, runtime);
+    ctx.register_table("t", Arc::new(table)).unwrap();
+
+    let df = ctx
+        .sql("explain analyze SELECT * FROM t ORDER BY random_numbers")
+        .await
+        .unwrap();
+
+    let _ = df.collect().await.expect("Query execution failed");
+}
+
 /// Run the query with the specified memory limit,
 /// and verifies the expected errors are returned
 #[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index 30b5abcf88..d84068527a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -24,7 +24,7 @@ use std::fmt;
 use std::fmt::{Debug, Formatter};
 use std::sync::Arc;
 
-use crate::common::{spawn_buffered, IPCWriter};
+use crate::common::spawn_buffered;
 use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
 use crate::expressions::PhysicalSortExpr;
 use crate::limit::LimitStream;
@@ -44,7 +44,9 @@ use crate::{
     Statistics,
 };
 
-use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array};
+use arrow::array::{
+    Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
+};
 use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays, 
SortColumn};
 use arrow::datatypes::{DataType, SchemaRef};
 use arrow::row::{RowConverter, SortField};
@@ -300,6 +302,7 @@ impl ExternalSorter {
         if input.num_rows() == 0 {
             return Ok(());
         }
+
         self.reserve_memory_for_merge()?;
 
         let size = get_reserved_byte_for_record_batch(&input);
@@ -397,6 +400,8 @@ impl ExternalSorter {
             return Ok(0);
         }
 
+        self.organize_stringview_arrays()?;
+
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
         let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
@@ -414,6 +419,69 @@ impl ExternalSorter {
         Ok(used)
     }
 
+    /// Reconstruct `self.in_mem_batches` to organize the payload buffers of 
each
+    /// `StringViewArray` in sequential order by calling `gc()` on them.
+    ///
+    /// Note this is a workaround until 
<https://github.com/apache/arrow-rs/issues/7185> is
+    /// available
+    ///
+    /// # Rationale
+    /// After (merge-based) sorting, all batches will be sorted into a single 
run,
+    /// but physically this sorted run is chunked into many small batches. For
+    /// `StringViewArray`s inside each sorted run, their inner buffers are not
+    /// re-constructed by default, leading to non-sequential payload locations
+    /// (permutated by `interleave()` Arrow kernel). A single payload buffer 
might
+    /// be shared by multiple `RecordBatch`es.
+    /// When writing each batch to disk, the writer has to write all 
referenced buffers,
+    /// because they have to be read back one by one to reduce memory usage. 
This
+    /// causes extra disk reads and writes, and potentially execution failure.
+    ///
+    /// # Example
+    /// Before sorting:
+    /// batch1 -> buffer1
+    /// batch2 -> buffer2
+    ///
+    /// sorted_batch1 -> buffer1
+    ///               -> buffer2
+    /// sorted_batch2 -> buffer1
+    ///               -> buffer2
+    ///
+    /// Then when spilling each batch, the writer has to write all referenced 
buffers
+    /// repeatedly.
+    fn organize_stringview_arrays(&mut self) -> Result<()> {
+        let mut organized_batches = 
Vec::with_capacity(self.in_mem_batches.len());
+
+        for batch in self.in_mem_batches.drain(..) {
+            let mut new_columns: Vec<Arc<dyn Array>> =
+                Vec::with_capacity(batch.num_columns());
+
+            let mut arr_mutated = false;
+            for array in batch.columns() {
+                if let Some(string_view_array) =
+                    array.as_any().downcast_ref::<StringViewArray>()
+                {
+                    let new_array = string_view_array.gc();
+                    new_columns.push(Arc::new(new_array));
+                    arr_mutated = true;
+                } else {
+                    new_columns.push(Arc::clone(array));
+                }
+            }
+
+            let organized_batch = if arr_mutated {
+                RecordBatch::try_new(batch.schema(), new_columns)?
+            } else {
+                batch
+            };
+
+            organized_batches.push(organized_batch);
+        }
+
+        self.in_mem_batches = organized_batches;
+
+        Ok(())
+    }
+
     /// Sorts the in_mem_batches in place
     ///
     /// Sorting may have freed memory, especially if fetch is `Some`. If
@@ -439,36 +507,18 @@ impl ExternalSorter {
         // `self.in_mem_batches` is already taken away by the sort_stream, now 
it is empty.
         // We'll gradually collect the sorted stream into self.in_mem_batches, 
or directly
         // write sorted batches to disk when the memory is insufficient.
-        let mut spill_writer: Option<IPCWriter> = None;
         while let Some(batch) = sorted_stream.next().await {
             let batch = batch?;
-            match &mut spill_writer {
-                None => {
-                    let sorted_size = 
get_reserved_byte_for_record_batch(&batch);
-                    if self.reservation.try_grow(sorted_size).is_err() {
-                        // Directly write in_mem_batches as well as all the 
remaining batches in
-                        // sorted_stream to disk. Further batches fetched from 
`sorted_stream` will
-                        // be handled by the `Some(writer)` matching arm.
-                        let spill_file =
-                            
self.runtime.disk_manager.create_tmp_file("Sorting")?;
-                        let mut writer = IPCWriter::new(spill_file.path(), 
&self.schema)?;
-                        // Flush everything in memory to the spill file
-                        for batch in self.in_mem_batches.drain(..) {
-                            writer.write(&batch)?;
-                        }
-                        // as well as the newly sorted batch
-                        writer.write(&batch)?;
-                        spill_writer = Some(writer);
-                        self.reservation.free();
-                        self.spills.push(spill_file);
-                    } else {
-                        self.in_mem_batches.push(batch);
-                        self.in_mem_batches_sorted = true;
-                    }
-                }
-                Some(writer) => {
-                    writer.write(&batch)?;
-                }
+            let sorted_size = get_reserved_byte_for_record_batch(&batch);
+            if self.reservation.try_grow(sorted_size).is_err() {
+                // Although the reservation is not enough, the batch is
+                // already in memory, so it's okay to combine it with 
previously
+                // sorted batches, and spill together.
+                self.in_mem_batches.push(batch);
+                self.spill().await?; // reservation is freed in spill()
+            } else {
+                self.in_mem_batches.push(batch);
+                self.in_mem_batches_sorted = true;
             }
         }
 
@@ -476,17 +526,10 @@ impl ExternalSorter {
         // upcoming `self.reserve_memory_for_merge()` may fail due to 
insufficient memory.
         drop(sorted_stream);
 
-        if let Some(writer) = &mut spill_writer {
-            writer.finish()?;
-            self.metrics.spill_count.add(1);
-            self.metrics.spilled_rows.add(writer.num_rows);
-            self.metrics.spilled_bytes.add(writer.num_bytes);
-        }
-
         // Sorting may free up some memory especially when fetch is `Some`. If 
we have
         // not freed more than 50% of the memory, then we have to spill to 
free up more
         // memory for inserting more batches.
-        if spill_writer.is_none() && self.reservation.size() > before / 2 {
+        if self.reservation.size() > before / 2 {
             // We have not freed more than 50% of the memory, so we have to 
spill to
             // free up more memory
             self.spill().await?;
@@ -1422,10 +1465,14 @@ mod tests {
         let spill_count = metrics.spill_count().unwrap();
         let spilled_rows = metrics.spilled_rows().unwrap();
         let spilled_bytes = metrics.spilled_bytes().unwrap();
-        // Processing 840 KB of data using 400 KB of memory requires at least 
2 spills
-        // It will spill roughly 18000 rows and 800 KBytes.
-        // We leave a little wiggle room for the actual numbers.
-        assert!((2..=10).contains(&spill_count));
+
+        // This test case is processing 840KB of data using 400KB of memory. 
Note
+        // that buffered batches can't be dropped until all sorted batches are
+        // generated, so we can only buffer `sort_spill_reservation_bytes` of 
sorted
+        // batches.
+        // The number of spills is roughly calculated as:
+        //  `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
+        assert!((12..=18).contains(&spill_count));
         assert!((15000..=20000).contains(&spilled_rows));
         assert!((700000..=900000).contains(&spilled_bytes));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to