This is an automated email from the ASF dual-hosted git repository.
dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bee4285d5 Feature/merge batches removal (#5300)
bee4285d5 is described below
commit bee4285d56b5cfea7a00203ba7aa284ee3231b68
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Feb 16 17:27:16 2023 +0300
Feature/merge batches removal (#5300)
* Remove merge_batches function
merge_batches function is replaced by concat_batches function from
arrow::compute
* Redundant merge_multiple_batches function is also removed
---
datafusion/core/src/physical_plan/common.rs | 45 ----------------------
.../windows/bounded_window_agg_exec.rs | 14 +++----
2 files changed, 7 insertions(+), 52 deletions(-)
diff --git a/datafusion/core/src/physical_plan/common.rs
b/datafusion/core/src/physical_plan/common.rs
index afe781c0f..caf9b5ebc 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -22,9 +22,7 @@ use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan,
Statistics};
-use arrow::compute::concat;
use arrow::datatypes::{Schema, SchemaRef};
-use arrow::error::ArrowError;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::PhysicalSortExpr;
@@ -93,49 +91,6 @@ pub async fn collect(stream: SendableRecordBatchStream) ->
Result<Vec<RecordBatc
stream.try_collect::<Vec<_>>().await
}
-/// Merge two record batch references into a single record batch.
-/// All the record batches inside the slice must have the same schema.
-pub fn merge_batches(
- first: &RecordBatch,
- second: &RecordBatch,
- schema: SchemaRef,
-) -> Result<RecordBatch> {
- let columns = (0..schema.fields.len())
- .map(|index| {
- let first_column = first.column(index).as_ref();
- let second_column = second.column(index).as_ref();
- concat(&[first_column, second_column])
- })
- .collect::<Result<Vec<_>, ArrowError>>()
- .map_err(Into::<DataFusionError>::into)?;
- RecordBatch::try_new(schema, columns).map_err(Into::into)
-}
-
-/// Merge a slice of record batch references into a single record batch, or
-/// return `None` if the slice itself is empty. All the record batches inside
the
-/// slice must have the same schema.
-pub fn merge_multiple_batches(
- batches: &[&RecordBatch],
- schema: SchemaRef,
-) -> Result<Option<RecordBatch>> {
- Ok(if batches.is_empty() {
- None
- } else {
- let columns = (0..schema.fields.len())
- .map(|index| {
- concat(
- &batches
- .iter()
- .map(|batch| batch.column(index).as_ref())
- .collect::<Vec<_>>(),
- )
- })
- .collect::<Result<Vec<_>, ArrowError>>()
- .map_err(Into::<DataFusionError>::into)?;
- Some(RecordBatch::try_new(schema, columns)?)
- })
-}
-
/// Recursively builds a list of files in a directory with a given extension
pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
let mut filenames: Vec<String> = Vec::new();
diff --git
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 0d929d135..0ac836cf2 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -31,7 +31,9 @@ use crate::physical_plan::{
RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
};
use arrow::array::Array;
-use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn};
+use arrow::compute::{
+ concat, concat_batches, lexicographical_partition_ranges, SortColumn,
+};
use arrow::{
array::ArrayRef,
datatypes::{Schema, SchemaRef},
@@ -48,7 +50,6 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use crate::physical_plan::common::merge_batches;
use datafusion_physical_expr::window::{
PartitionBatchState, PartitionBatches, PartitionKey,
PartitionWindowAggStates,
WindowAggState, WindowState,
@@ -381,10 +382,9 @@ impl PartitionByHandler for
SortedPartitionByBoundedWindowStream {
if let Some(partition_batch_state) =
self.partition_buffers.get_mut(&partition_row)
{
- partition_batch_state.record_batch = merge_batches(
- &partition_batch_state.record_batch,
- &partition_batch,
- self.input.schema(),
+ partition_batch_state.record_batch = concat_batches(
+ &self.input.schema(),
+ [&partition_batch_state.record_batch,
&partition_batch],
)?;
} else {
let partition_batch_state = PartitionBatchState {
@@ -405,7 +405,7 @@ impl PartitionByHandler for
SortedPartitionByBoundedWindowStream {
self.input_buffer = if self.input_buffer.num_rows() == 0 {
record_batch
} else {
- merge_batches(&self.input_buffer, &record_batch,
self.input.schema())?
+ concat_batches(&self.input.schema(), [&self.input_buffer,
&record_batch])?
};
Ok(())