This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f1360b8486 Minor: move batch spilling methods to `lib.rs` to make it
reusable (#11154)
f1360b8486 is described below
commit f1360b8486641cbac15212466eddef5bd6503708
Author: Oleks V <[email protected]>
AuthorDate: Fri Jun 28 09:05:49 2024 -0700
Minor: move batch spilling methods to `lib.rs` to make it reusable (#11154)
---
.../physical-plan/src/aggregates/row_hash.rs | 6 +-
datafusion/physical-plan/src/lib.rs | 64 ++++++++++++++-
datafusion/physical-plan/src/sorts/sort.rs | 95 +++-------------------
3 files changed, 78 insertions(+), 87 deletions(-)
diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs
b/datafusion/physical-plan/src/aggregates/row_hash.rs
index ad0860b93a..27577e6c8b 100644
--- a/datafusion/physical-plan/src/aggregates/row_hash.rs
+++ b/datafusion/physical-plan/src/aggregates/row_hash.rs
@@ -29,10 +29,10 @@ use crate::aggregates::{
};
use crate::common::IPCWriter;
use crate::metrics::{BaselineMetrics, RecordOutput};
-use crate::sorts::sort::{read_spill_as_stream, sort_batch};
+use crate::sorts::sort::sort_batch;
use crate::sorts::streaming_merge;
use crate::stream::RecordBatchStreamAdapter;
-use crate::{aggregates, ExecutionPlan, PhysicalExpr};
+use crate::{aggregates, read_spill_as_stream, ExecutionPlan, PhysicalExpr};
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::*;
@@ -752,7 +752,7 @@ impl GroupedHashAggregateStream {
})),
)));
for spill in self.spill_state.spills.drain(..) {
- let stream = read_spill_as_stream(spill, schema.clone())?;
+ let stream = read_spill_as_stream(spill, schema.clone(), 2)?;
streams.push(stream);
}
self.spill_state.is_stream_merging = true;
diff --git a/datafusion/physical-plan/src/lib.rs
b/datafusion/physical-plan/src/lib.rs
index c648547c98..aef5b30796 100644
--- a/datafusion/physical-plan/src/lib.rs
+++ b/datafusion/physical-plan/src/lib.rs
@@ -19,6 +19,9 @@
use std::any::Any;
use std::fmt::Debug;
+use std::fs::File;
+use std::io::BufReader;
+use std::path::{Path, PathBuf};
use std::sync::Arc;
use crate::coalesce_partitions::CoalescePartitionsExec;
@@ -28,15 +31,18 @@ use crate::repartition::RepartitionExec;
use crate::sorts::sort_preserving_merge::SortPreservingMergeExec;
use arrow::datatypes::SchemaRef;
+use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use datafusion_common::config::ConfigOptions;
-use datafusion_common::Result;
+use datafusion_common::{exec_datafusion_err, Result};
use datafusion_execution::TaskContext;
use datafusion_physical_expr::{
EquivalenceProperties, LexOrdering, PhysicalSortExpr,
PhysicalSortRequirement,
};
use futures::stream::TryStreamExt;
+use log::debug;
+use tokio::sync::mpsc::Sender;
use tokio::task::JoinSet;
mod ordering;
@@ -87,8 +93,13 @@ pub use datafusion_physical_expr::{
};
// Backwards compatibility
+use crate::common::IPCWriter;
pub use crate::stream::EmptyRecordBatchStream;
+use crate::stream::RecordBatchReceiverStream;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::memory_pool::human_readable_size;
pub use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
+
pub mod udaf {
pub use datafusion_physical_expr_common::aggregate::{
create_aggregate_expr, AggregateFunctionExpr,
@@ -799,6 +810,57 @@ pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) ->
Vec<String> {
actual.iter().map(|elem| elem.to_string()).collect()
}
+/// Read spilled batches from the disk
+///
+/// `path` - temp file
+/// `schema` - batches schema, should be the same across batches
+/// `buffer` - internal buffer of capacity batches
+pub fn read_spill_as_stream(
+ path: RefCountedTempFile,
+ schema: SchemaRef,
+ buffer: usize,
+) -> Result<SendableRecordBatchStream> {
+ let mut builder = RecordBatchReceiverStream::builder(schema, buffer);
+ let sender = builder.tx();
+
+ builder.spawn_blocking(move || read_spill(sender, path.path()));
+
+ Ok(builder.build())
+}
+
+/// Spills in-memory `batches` to disk.
+///
+/// Returns total number of the rows spilled to disk.
+pub fn spill_record_batches(
+ batches: Vec<RecordBatch>,
+ path: PathBuf,
+ schema: SchemaRef,
+) -> Result<usize> {
+ let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
+ for batch in batches {
+ writer.write(&batch)?;
+ }
+ writer.finish()?;
+ debug!(
+ "Spilled {} batches of total {} rows to disk, memory released {}",
+ writer.num_batches,
+ writer.num_rows,
+ human_readable_size(writer.num_bytes),
+ );
+ Ok(writer.num_rows)
+}
+
+fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
+ let file = BufReader::new(File::open(path)?);
+ let reader = FileReader::try_new(file, None)?;
+ for batch in reader {
+ sender
+ .blocking_send(batch.map_err(Into::into))
+ .map_err(|e| exec_datafusion_err!("{e}"))?;
+ }
+ Ok(())
+}
+
#[cfg(test)]
mod tests {
use std::any::Any;
diff --git a/datafusion/physical-plan/src/sorts/sort.rs
b/datafusion/physical-plan/src/sorts/sort.rs
index 2a48625345..47901591c8 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -22,45 +22,38 @@
use std::any::Any;
use std::fmt;
use std::fmt::{Debug, Formatter};
-use std::fs::File;
-use std::io::BufReader;
-use std::path::{Path, PathBuf};
use std::sync::Arc;
-use crate::common::{spawn_buffered, IPCWriter};
+use crate::common::spawn_buffered;
use crate::expressions::PhysicalSortExpr;
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
use crate::sorts::streaming_merge::streaming_merge;
-use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
+use crate::stream::RecordBatchStreamAdapter;
use crate::topk::TopK;
use crate::{
- DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream,
ExecutionMode,
- ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties,
- SendableRecordBatchStream, Statistics,
+ read_spill_as_stream, spill_record_batches, DisplayAs, DisplayFormatType,
+ Distribution, EmptyRecordBatchStream, ExecutionMode, ExecutionPlan,
+ ExecutionPlanProperties, Partitioning, PlanProperties,
SendableRecordBatchStream,
+ Statistics,
};
use arrow::compute::{concat_batches, lexsort_to_indices, take, SortColumn};
use arrow::datatypes::SchemaRef;
-use arrow::ipc::reader::FileReader;
use arrow::record_batch::RecordBatch;
use arrow::row::{RowConverter, SortField};
use arrow_array::{Array, RecordBatchOptions, UInt32Array};
use arrow_schema::DataType;
-use datafusion_common::{exec_err, DataFusionError, Result};
-use datafusion_common_runtime::SpawnedTask;
+use datafusion_common::{DataFusionError, Result};
use datafusion_execution::disk_manager::RefCountedTempFile;
-use datafusion_execution::memory_pool::{
- human_readable_size, MemoryConsumer, MemoryReservation,
-};
+use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_execution::TaskContext;
use datafusion_physical_expr::LexOrdering;
use futures::{StreamExt, TryStreamExt};
-use log::{debug, error, trace};
-use tokio::sync::mpsc::Sender;
+use log::{debug, trace};
struct ExternalSorterMetrics {
/// metrics
@@ -345,7 +338,7 @@ impl ExternalSorter {
spill.path()
)));
}
- let stream = read_spill_as_stream(spill, self.schema.clone())?;
+ let stream = read_spill_as_stream(spill, self.schema.clone(),
2)?;
streams.push(stream);
}
@@ -402,7 +395,7 @@ impl ExternalSorter {
let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
let spilled_rows =
- spill_sorted_batches(batches, spill_file.path(),
self.schema.clone()).await?;
+ spill_record_batches(batches, spill_file.path().into(),
self.schema.clone())?;
let used = self.reservation.free();
self.metrics.spill_count.add(1);
self.metrics.spilled_bytes.add(used);
@@ -667,70 +660,6 @@ pub(crate) fn lexsort_to_indices_multi_columns(
Ok(indices)
}
-/// Spills sorted `in_memory_batches` to disk.
-///
-/// Returns number of the rows spilled to disk.
-async fn spill_sorted_batches(
- batches: Vec<RecordBatch>,
- path: &Path,
- schema: SchemaRef,
-) -> Result<usize> {
- let path: PathBuf = path.into();
- let task = SpawnedTask::spawn_blocking(move || write_sorted(batches, path,
schema));
- match task.join().await {
- Ok(r) => r,
- Err(e) => exec_err!("Error occurred while spilling {e}"),
- }
-}
-
-pub(crate) fn read_spill_as_stream(
- path: RefCountedTempFile,
- schema: SchemaRef,
-) -> Result<SendableRecordBatchStream> {
- let mut builder = RecordBatchReceiverStream::builder(schema, 2);
- let sender = builder.tx();
-
- builder.spawn_blocking(move || {
- let result = read_spill(sender, path.path());
- if let Err(e) = &result {
- error!("Failure while reading spill file: {:?}. Error: {}", path,
e);
- }
- result
- });
-
- Ok(builder.build())
-}
-
-fn write_sorted(
- batches: Vec<RecordBatch>,
- path: PathBuf,
- schema: SchemaRef,
-) -> Result<usize> {
- let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
- for batch in batches {
- writer.write(&batch)?;
- }
- writer.finish()?;
- debug!(
- "Spilled {} batches of total {} rows to disk, memory released {}",
- writer.num_batches,
- writer.num_rows,
- human_readable_size(writer.num_bytes),
- );
- Ok(writer.num_rows)
-}
-
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
- let file = BufReader::new(File::open(path)?);
- let reader = FileReader::try_new(file, None)?;
- for batch in reader {
- sender
- .blocking_send(batch.map_err(Into::into))
- .map_err(|e| DataFusionError::Execution(format!("{e}")))?;
- }
- Ok(())
-}
-
/// Sort execution plan.
///
/// Support sorting datasets that are larger than the memory allotted
@@ -776,7 +705,7 @@ impl SortExec {
/// Specify the partitioning behavior of this sort exec
///
/// If `preserve_partitioning` is true, sorts each partition
- /// individually, producing one sorted strema for each input partition.
+ /// individually, producing one sorted stream for each input partition.
///
/// If `preserve_partitioning` is false, sorts and merges all
/// input partitions producing a single, sorted partition.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]