This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bee4285d5 Feature/merge batches removal (#5300)
bee4285d5 is described below

commit bee4285d56b5cfea7a00203ba7aa284ee3231b68
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Feb 16 17:27:16 2023 +0300

    Feature/merge batches removal (#5300)
    
    * Remove merge_batches function
    
    merge_batches function is replaced by concat_batches function from 
arrow::compute
    
    * Redundant merge_multiple_batches function is also removed
---
 datafusion/core/src/physical_plan/common.rs        | 45 ----------------------
 .../windows/bounded_window_agg_exec.rs             | 14 +++----
 2 files changed, 7 insertions(+), 52 deletions(-)

diff --git a/datafusion/core/src/physical_plan/common.rs 
b/datafusion/core/src/physical_plan/common.rs
index afe781c0f..caf9b5ebc 100644
--- a/datafusion/core/src/physical_plan/common.rs
+++ b/datafusion/core/src/physical_plan/common.rs
@@ -22,9 +22,7 @@ use crate::error::{DataFusionError, Result};
 use crate::execution::context::TaskContext;
 use crate::physical_plan::metrics::MemTrackingMetrics;
 use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, 
Statistics};
-use arrow::compute::concat;
 use arrow::datatypes::{Schema, SchemaRef};
-use arrow::error::ArrowError;
 use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
 use arrow::record_batch::RecordBatch;
 use datafusion_physical_expr::PhysicalSortExpr;
@@ -93,49 +91,6 @@ pub async fn collect(stream: SendableRecordBatchStream) -> 
Result<Vec<RecordBatc
     stream.try_collect::<Vec<_>>().await
 }
 
-/// Merge two record batch references into a single record batch.
-/// All the record batches inside the slice must have the same schema.
-pub fn merge_batches(
-    first: &RecordBatch,
-    second: &RecordBatch,
-    schema: SchemaRef,
-) -> Result<RecordBatch> {
-    let columns = (0..schema.fields.len())
-        .map(|index| {
-            let first_column = first.column(index).as_ref();
-            let second_column = second.column(index).as_ref();
-            concat(&[first_column, second_column])
-        })
-        .collect::<Result<Vec<_>, ArrowError>>()
-        .map_err(Into::<DataFusionError>::into)?;
-    RecordBatch::try_new(schema, columns).map_err(Into::into)
-}
-
-/// Merge a slice of record batch references into a single record batch, or
-/// return `None` if the slice itself is empty. All the record batches inside 
the
-/// slice must have the same schema.
-pub fn merge_multiple_batches(
-    batches: &[&RecordBatch],
-    schema: SchemaRef,
-) -> Result<Option<RecordBatch>> {
-    Ok(if batches.is_empty() {
-        None
-    } else {
-        let columns = (0..schema.fields.len())
-            .map(|index| {
-                concat(
-                    &batches
-                        .iter()
-                        .map(|batch| batch.column(index).as_ref())
-                        .collect::<Vec<_>>(),
-                )
-            })
-            .collect::<Result<Vec<_>, ArrowError>>()
-            .map_err(Into::<DataFusionError>::into)?;
-        Some(RecordBatch::try_new(schema, columns)?)
-    })
-}
-
 /// Recursively builds a list of files in a directory with a given extension
 pub fn build_checked_file_list(dir: &str, ext: &str) -> Result<Vec<String>> {
     let mut filenames: Vec<String> = Vec::new();
diff --git 
a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs 
b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
index 0d929d135..0ac836cf2 100644
--- a/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
+++ b/datafusion/core/src/physical_plan/windows/bounded_window_agg_exec.rs
@@ -31,7 +31,9 @@ use crate::physical_plan::{
     RecordBatchStream, SendableRecordBatchStream, Statistics, WindowExpr,
 };
 use arrow::array::Array;
-use arrow::compute::{concat, lexicographical_partition_ranges, SortColumn};
+use arrow::compute::{
+    concat, concat_batches, lexicographical_partition_ranges, SortColumn,
+};
 use arrow::{
     array::ArrayRef,
     datatypes::{Schema, SchemaRef},
@@ -48,7 +50,6 @@ use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use crate::physical_plan::common::merge_batches;
 use datafusion_physical_expr::window::{
     PartitionBatchState, PartitionBatches, PartitionKey, 
PartitionWindowAggStates,
     WindowAggState, WindowState,
@@ -381,10 +382,9 @@ impl PartitionByHandler for 
SortedPartitionByBoundedWindowStream {
                 if let Some(partition_batch_state) =
                     self.partition_buffers.get_mut(&partition_row)
                 {
-                    partition_batch_state.record_batch = merge_batches(
-                        &partition_batch_state.record_batch,
-                        &partition_batch,
-                        self.input.schema(),
+                    partition_batch_state.record_batch = concat_batches(
+                        &self.input.schema(),
+                        [&partition_batch_state.record_batch, 
&partition_batch],
                     )?;
                 } else {
                     let partition_batch_state = PartitionBatchState {
@@ -405,7 +405,7 @@ impl PartitionByHandler for 
SortedPartitionByBoundedWindowStream {
         self.input_buffer = if self.input_buffer.num_rows() == 0 {
             record_batch
         } else {
-            merge_batches(&self.input_buffer, &record_batch, 
self.input.schema())?
+            concat_batches(&self.input.schema(), [&self.input_buffer, 
&record_batch])?
         };
 
         Ok(())

Reply via email to