This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 99c811a3bf Fix: External sort failing on `StringView` due to shared
buffers (#14823)
99c811a3bf is described below
commit 99c811a3bf994437122a71c31315a2e7471b58e8
Author: Yongting You <[email protected]>
AuthorDate: Wed Feb 26 23:46:51 2025 +0800
Fix: External sort failing on `StringView` due to shared buffers (#14823)
* organize stringview arrays before spilling
* add unit test
* fix
* add comment
* review
* Update datafusion/physical-plan/src/sorts/sort.rs
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/core/tests/memory_limit/mod.rs | 69 ++++++++++++++-
datafusion/physical-plan/src/sorts/sort.rs | 131 ++++++++++++++++++++---------
2 files changed, 156 insertions(+), 44 deletions(-)
diff --git a/datafusion/core/tests/memory_limit/mod.rs
b/datafusion/core/tests/memory_limit/mod.rs
index a1985a1aa4..2deb8fde2d 100644
--- a/datafusion/core/tests/memory_limit/mod.rs
+++ b/datafusion/core/tests/memory_limit/mod.rs
@@ -23,9 +23,10 @@ use std::sync::{Arc, LazyLock};
#[cfg(feature = "extended_tests")]
mod memory_limit_validation;
-use arrow::array::{ArrayRef, DictionaryArray, RecordBatch};
+use arrow::array::{ArrayRef, DictionaryArray, Int32Array, RecordBatch,
StringViewArray};
use arrow::compute::SortOptions;
use arrow::datatypes::{Int32Type, SchemaRef};
+use arrow_schema::{DataType, Field, Schema};
use datafusion::assert_batches_eq;
use datafusion::datasource::memory::MemorySourceConfig;
use datafusion::datasource::source::DataSourceExec;
@@ -41,7 +42,7 @@ use datafusion_catalog::streaming::StreamingTable;
use datafusion_catalog::Session;
use datafusion_common::{assert_contains, Result};
use datafusion_execution::memory_pool::{
- GreedyMemoryPool, MemoryPool, TrackConsumersPool,
+ FairSpillPool, GreedyMemoryPool, MemoryPool, TrackConsumersPool,
};
use datafusion_execution::TaskContext;
use datafusion_expr::{Expr, TableType};
@@ -49,6 +50,7 @@ use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr};
use datafusion_physical_optimizer::join_selection::JoinSelection;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::spill::get_record_batch_memory_size;
+use rand::Rng;
use test_utils::AccessLogGenerator;
use async_trait::async_trait;
@@ -403,6 +405,69 @@ async fn oom_with_tracked_consumer_pool() {
.await
}
+/// For regression case: if spilled `StringViewArray`'s buffer will be
referenced by
+/// other batches which are also need to be spilled, then the spill writer will
+/// repeatedly write out the same buffer, and after reading back, each batch's
size
+/// will explode.
+///
+/// This test setup will cause 10 spills, each spill will sort around 20
batches.
+/// If there is memory explosion for spilled record batch, this test will fail.
+#[tokio::test]
+async fn test_stringview_external_sort() {
+ let mut rng = rand::thread_rng();
+ let array_length = 1000;
+ let num_batches = 200;
+ // Batches contain two columns: random 100-byte string, and random i32
+ let mut batches = Vec::with_capacity(num_batches);
+
+ for _ in 0..num_batches {
+ let strings: Vec<String> = (0..array_length)
+ .map(|_| {
+ (0..100)
+ .map(|_| rng.gen_range(0..=u8::MAX) as char)
+ .collect()
+ })
+ .collect();
+
+ let string_array = StringViewArray::from(strings);
+ let array_ref: ArrayRef = Arc::new(string_array);
+
+ let random_numbers: Vec<i32> =
+ (0..array_length).map(|_| rng.gen_range(0..=1000)).collect();
+ let int_array = Int32Array::from(random_numbers);
+ let int_array_ref: ArrayRef = Arc::new(int_array);
+
+ let batch = RecordBatch::try_new(
+ Arc::new(Schema::new(vec![
+ Field::new("strings", DataType::Utf8View, false),
+ Field::new("random_numbers", DataType::Int32, false),
+ ])),
+ vec![array_ref, int_array_ref],
+ )
+ .unwrap();
+ batches.push(batch);
+ }
+
+ // Run a sql query that sorts the batches by the int column
+ let schema = batches[0].schema();
+ let table = MemTable::try_new(schema, vec![batches]).unwrap();
+ let builder = RuntimeEnvBuilder::new()
+ .with_memory_pool(Arc::new(FairSpillPool::new(60 * 1024 * 1024)));
+ let runtime = builder.build_arc().unwrap();
+
+ let config = SessionConfig::new().with_sort_spill_reservation_bytes(40 *
1024 * 1024);
+
+ let ctx = SessionContext::new_with_config_rt(config, runtime);
+ ctx.register_table("t", Arc::new(table)).unwrap();
+
+ let df = ctx
+ .sql("explain analyze SELECT * FROM t ORDER BY random_numbers")
+ .await
+ .unwrap();
+
+ let _ = df.collect().await.expect("Query execution failed");
+}
+
/// Run the query with the specified memory limit,
/// and verifies the expected errors are returned
#[derive(Clone, Debug)]
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 30b5abcf88..d84068527a 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -24,7 +24,7 @@ use std::fmt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
-use crate::common::{spawn_buffered, IPCWriter};
+use crate::common::spawn_buffered;
use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
use crate::expressions::PhysicalSortExpr;
use crate::limit::LimitStream;
@@ -44,7 +44,9 @@ use crate::{
Statistics,
};
-use arrow::array::{Array, RecordBatch, RecordBatchOptions, UInt32Array};
+use arrow::array::{
+ Array, RecordBatch, RecordBatchOptions, StringViewArray, UInt32Array,
+};
use arrow::compute::{concat_batches, lexsort_to_indices, take_arrays,
SortColumn};
use arrow::datatypes::{DataType, SchemaRef};
use arrow::row::{RowConverter, SortField};
@@ -300,6 +302,7 @@ impl ExternalSorter {
if input.num_rows() == 0 {
return Ok(());
}
+
self.reserve_memory_for_merge()?;
let size = get_reserved_byte_for_record_batch(&input);
@@ -397,6 +400,8 @@ impl ExternalSorter {
return Ok(0);
}
+ self.organize_stringview_arrays()?;
+
debug!("Spilling sort data of ExternalSorter to disk whilst
inserting");
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
@@ -414,6 +419,69 @@ impl ExternalSorter {
Ok(used)
}
+ /// Reconstruct `self.in_mem_batches` to organize the payload buffers of
each
+ /// `StringViewArray` in sequential order by calling `gc()` on them.
+ ///
+ /// Note this is a workaround until
<https://github.com/apache/arrow-rs/issues/7185> is
+ /// available
+ ///
+ /// # Rationale
+ /// After (merge-based) sorting, all batches will be sorted into a single
run,
+ /// but physically this sorted run is chunked into many small batches. For
+ /// `StringViewArray`s inside each sorted run, their inner buffers are not
+ /// re-constructed by default, leading to non-sequential payload locations
+ /// (permutated by `interleave()` Arrow kernel). A single payload buffer
might
+ /// be shared by multiple `RecordBatch`es.
+ /// When writing each batch to disk, the writer has to write all
referenced buffers,
+ /// because they have to be read back one by one to reduce memory usage.
This
+ /// causes extra disk reads and writes, and potentially execution failure.
+ ///
+ /// # Example
+ /// Before sorting:
+ /// batch1 -> buffer1
+ /// batch2 -> buffer2
+ ///
+ /// sorted_batch1 -> buffer1
+ /// -> buffer2
+ /// sorted_batch2 -> buffer1
+ /// -> buffer2
+ ///
+ /// Then when spilling each batch, the writer has to write all referenced
buffers
+ /// repeatedly.
+ fn organize_stringview_arrays(&mut self) -> Result<()> {
+ let mut organized_batches =
Vec::with_capacity(self.in_mem_batches.len());
+
+ for batch in self.in_mem_batches.drain(..) {
+ let mut new_columns: Vec<Arc<dyn Array>> =
+ Vec::with_capacity(batch.num_columns());
+
+ let mut arr_mutated = false;
+ for array in batch.columns() {
+ if let Some(string_view_array) =
+ array.as_any().downcast_ref::<StringViewArray>()
+ {
+ let new_array = string_view_array.gc();
+ new_columns.push(Arc::new(new_array));
+ arr_mutated = true;
+ } else {
+ new_columns.push(Arc::clone(array));
+ }
+ }
+
+ let organized_batch = if arr_mutated {
+ RecordBatch::try_new(batch.schema(), new_columns)?
+ } else {
+ batch
+ };
+
+ organized_batches.push(organized_batch);
+ }
+
+ self.in_mem_batches = organized_batches;
+
+ Ok(())
+ }
+
/// Sorts the in_mem_batches in place
///
/// Sorting may have freed memory, especially if fetch is `Some`. If
@@ -439,36 +507,18 @@ impl ExternalSorter {
// `self.in_mem_batches` is already taken away by the sort_stream, now
it is empty.
// We'll gradually collect the sorted stream into self.in_mem_batches,
or directly
// write sorted batches to disk when the memory is insufficient.
- let mut spill_writer: Option<IPCWriter> = None;
while let Some(batch) = sorted_stream.next().await {
let batch = batch?;
- match &mut spill_writer {
- None => {
- let sorted_size =
get_reserved_byte_for_record_batch(&batch);
- if self.reservation.try_grow(sorted_size).is_err() {
- // Directly write in_mem_batches as well as all the
remaining batches in
- // sorted_stream to disk. Further batches fetched from
`sorted_stream` will
- // be handled by the `Some(writer)` matching arm.
- let spill_file =
-
self.runtime.disk_manager.create_tmp_file("Sorting")?;
- let mut writer = IPCWriter::new(spill_file.path(),
&self.schema)?;
- // Flush everything in memory to the spill file
- for batch in self.in_mem_batches.drain(..) {
- writer.write(&batch)?;
- }
- // as well as the newly sorted batch
- writer.write(&batch)?;
- spill_writer = Some(writer);
- self.reservation.free();
- self.spills.push(spill_file);
- } else {
- self.in_mem_batches.push(batch);
- self.in_mem_batches_sorted = true;
- }
- }
- Some(writer) => {
- writer.write(&batch)?;
- }
+ let sorted_size = get_reserved_byte_for_record_batch(&batch);
+ if self.reservation.try_grow(sorted_size).is_err() {
+ // Although the reservation is not enough, the batch is
+ // already in memory, so it's okay to combine it with
previously
+ // sorted batches, and spill together.
+ self.in_mem_batches.push(batch);
+ self.spill().await?; // reservation is freed in spill()
+ } else {
+ self.in_mem_batches.push(batch);
+ self.in_mem_batches_sorted = true;
}
}
@@ -476,17 +526,10 @@ impl ExternalSorter {
// upcoming `self.reserve_memory_for_merge()` may fail due to
insufficient memory.
drop(sorted_stream);
- if let Some(writer) = &mut spill_writer {
- writer.finish()?;
- self.metrics.spill_count.add(1);
- self.metrics.spilled_rows.add(writer.num_rows);
- self.metrics.spilled_bytes.add(writer.num_bytes);
- }
-
// Sorting may free up some memory especially when fetch is `Some`. If
we have
// not freed more than 50% of the memory, then we have to spill to
free up more
// memory for inserting more batches.
- if spill_writer.is_none() && self.reservation.size() > before / 2 {
+ if self.reservation.size() > before / 2 {
// We have not freed more than 50% of the memory, so we have to
spill to
// free up more memory
self.spill().await?;
@@ -1422,10 +1465,14 @@ mod tests {
let spill_count = metrics.spill_count().unwrap();
let spilled_rows = metrics.spilled_rows().unwrap();
let spilled_bytes = metrics.spilled_bytes().unwrap();
- // Processing 840 KB of data using 400 KB of memory requires at least
2 spills
- // It will spill roughly 18000 rows and 800 KBytes.
- // We leave a little wiggle room for the actual numbers.
- assert!((2..=10).contains(&spill_count));
+
+ // This test case is processing 840KB of data using 400KB of memory.
Note
+ // that buffered batches can't be dropped until all sorted batches are
+ // generated, so we can only buffer `sort_spill_reservation_bytes` of
sorted
+ // batches.
+ // The number of spills is roughly calculated as:
+ // `number_of_batches / (sort_spill_reservation_bytes / batch_size)`
+ assert!((12..=18).contains(&spill_count));
assert!((15000..=20000).contains(&spilled_rows));
assert!((700000..=900000).contains(&spilled_bytes));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]