This is an automated email from the ASF dual-hosted git repository. tustvold pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 7cf248b8cc Remove SizedRecordBatchStream (#6309) 7cf248b8cc is described below commit 7cf248b8cce13e619dde75871e308c75977770fe Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com> AuthorDate: Tue May 9 22:59:22 2023 +0100 Remove SizedRecordBatchStream (#6309) --- datafusion/core/src/physical_plan/common.rs | 56 ++--------------------- datafusion/core/src/physical_plan/explain.rs | 18 ++------ datafusion/core/tests/provider_filter_pushdown.rs | 21 ++++----- 3 files changed, 16 insertions(+), 79 deletions(-) diff --git a/datafusion/core/src/physical_plan/common.rs b/datafusion/core/src/physical_plan/common.rs index 42cd8fada9..ce1299fb6d 100644 --- a/datafusion/core/src/physical_plan/common.rs +++ b/datafusion/core/src/physical_plan/common.rs @@ -17,17 +17,16 @@ //! Defines common code used in execution plans -use super::{RecordBatchStream, SendableRecordBatchStream}; +use super::SendableRecordBatchStream; use crate::error::{DataFusionError, Result}; use crate::execution::context::TaskContext; use crate::execution::memory_pool::MemoryReservation; -use crate::physical_plan::metrics::MemTrackingMetrics; use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics}; -use arrow::datatypes::{Schema, SchemaRef}; +use arrow::datatypes::Schema; use arrow::ipc::writer::{FileWriter, IpcWriteOptions}; use arrow::record_batch::RecordBatch; use datafusion_physical_expr::PhysicalSortExpr; -use futures::{Future, Stream, StreamExt, TryStreamExt}; +use futures::{Future, StreamExt, TryStreamExt}; use log::debug; use parking_lot::Mutex; use pin_project_lite::pin_project; @@ -42,55 +41,6 @@ use tokio::task::JoinHandle; /// [`MemoryReservation`] used across query execution streams pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>; -/// Stream of record batches -pub struct SizedRecordBatchStream { - schema: SchemaRef, - batches: Vec<Arc<RecordBatch>>, - index: usize, - metrics: MemTrackingMetrics, -} - -impl SizedRecordBatchStream { - /// Create a new RecordBatchIterator - pub fn new( - schema: SchemaRef, - batches: Vec<Arc<RecordBatch>>, - mut metrics: MemTrackingMetrics, - ) -> Self { - let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>(); - metrics.init_mem_used(size); - SizedRecordBatchStream { - schema, - index: 0, - batches, - metrics, - } - } -} - -impl Stream for SizedRecordBatchStream { - type Item = Result<RecordBatch>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - _: &mut Context<'_>, - ) -> Poll<Option<Self::Item>> { - let poll = Poll::Ready(if self.index < self.batches.len() { - self.index += 1; - Some(Ok(self.batches[self.index - 1].as_ref().clone())) - } else { - None - }); - self.metrics.record_poll(poll) - } -} - -impl RecordBatchStream for SizedRecordBatchStream { - fn schema(&self) -> SchemaRef { - self.schema.clone() - } -} - /// Create a vector of record batches from a stream pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> { stream.try_collect::<Vec<_>>().await diff --git a/datafusion/core/src/physical_plan/explain.rs b/datafusion/core/src/physical_plan/explain.rs index c46a4a7391..6eb72e4ff3 100644 --- a/datafusion/core/src/physical_plan/explain.rs +++ b/datafusion/core/src/physical_plan/explain.rs @@ -23,17 +23,14 @@ use std::sync::Arc; use crate::{ error::{DataFusionError, Result}, logical_expr::StringifiedPlan, - physical_plan::{ - common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan, Partitioning, - Statistics, - }, + physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics}, }; use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch}; use log::trace; use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream}; use crate::execution::context::TaskContext; -use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; +use crate::physical_plan::stream::RecordBatchStreamAdapter; /// Explain execution plan operator. This operator contains the string /// values of the various plans it has when it is created, and passes @@ -150,17 +147,12 @@ impl ExecutionPlan for ExplainExec { ], )?; - let metrics = ExecutionPlanMetricsSet::new(); - let tracking_metrics = - MemTrackingMetrics::new(&metrics, context.memory_pool(), partition); - trace!( - "Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); + "Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id()); - Ok(Box::pin(SizedRecordBatchStream::new( + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), - vec![Arc::new(record_batch)], - tracking_metrics, + futures::stream::iter(vec![Ok(record_batch)]), ))) } diff --git a/datafusion/core/tests/provider_filter_pushdown.rs b/datafusion/core/tests/provider_filter_pushdown.rs index 36b0789829..ac1eef850d 100644 --- a/datafusion/core/tests/provider_filter_pushdown.rs +++ b/datafusion/core/tests/provider_filter_pushdown.rs @@ -23,9 +23,8 @@ use datafusion::datasource::datasource::{TableProvider, TableType}; use datafusion::error::Result; use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; use datafusion::logical_expr::{Expr, TableProviderFilterPushDown}; -use datafusion::physical_plan::common::SizedRecordBatchStream; use datafusion::physical_plan::expressions::PhysicalSortExpr; -use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics}; +use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -56,7 +55,7 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> { #[derive(Debug)] struct CustomPlan { schema: SchemaRef, - batches: Vec<Arc<RecordBatch>>, + batches: Vec<RecordBatch>, } impl ExecutionPlan for CustomPlan { @@ -89,16 +88,12 @@ impl ExecutionPlan for CustomPlan { fn execute( &self, - partition: usize, - context: Arc<TaskContext>, + _partition: usize, + _context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream> { - let metrics = ExecutionPlanMetricsSet::new(); - let tracking_metrics = - MemTrackingMetrics::new(&metrics, context.memory_pool(), partition); - Ok(Box::pin(SizedRecordBatchStream::new( + Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), - self.batches.clone(), - tracking_metrics, + futures::stream::iter(self.batches.clone().into_iter().map(Ok)), ))) } @@ -183,8 +178,8 @@ impl TableProvider for CustomProvider { Ok(Arc::new(CustomPlan { schema: self.zero_batch.schema(), batches: match int_value { - 0 => vec![Arc::new(self.zero_batch.clone())], - 1 => vec![Arc::new(self.one_batch.clone())], + 0 => vec![self.zero_batch.clone()], + 1 => vec![self.one_batch.clone()], _ => vec![], }, }))