This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 58a36b620 chore: Re-organize shuffle writer code (#1439)
58a36b620 is described below

commit 58a36b62004c1e300d7f3113d9a64713a94c0db9
Author: Andy Grove <agr...@apache.org>
AuthorDate: Tue Feb 25 17:14:18 2025 -0700

    chore: Re-organize shuffle writer code (#1439)
---
 native/core/src/execution/shuffle/builders.rs      |  599 ++++++++
 native/core/src/execution/shuffle/codec.rs         |  318 +++-
 native/core/src/execution/shuffle/mod.rs           |    9 +-
 native/core/src/execution/shuffle/row.rs           |    4 +-
 .../core/src/execution/shuffle/shuffle_writer.rs   | 1544 +++++---------------
 5 files changed, 1252 insertions(+), 1222 deletions(-)

diff --git a/native/core/src/execution/shuffle/builders.rs 
b/native/core/src/execution/shuffle/builders.rs
new file mode 100644
index 000000000..184467b5d
--- /dev/null
+++ b/native/core/src/execution/shuffle/builders.rs
@@ -0,0 +1,599 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow_array::builder::{make_builder, ArrayBuilder};
+use std::sync::Arc;
+
+use crate::common::bit::ceil;
+use arrow::datatypes::*;
+use datafusion::arrow::{
+    array::*,
+    datatypes::{DataType, SchemaRef, TimeUnit},
+    error::Result as ArrowResult,
+    record_batch::RecordBatch,
+};
+
+pub(crate) fn new_array_builders(
+    schema: &SchemaRef,
+    batch_size: usize,
+) -> Vec<Box<dyn ArrayBuilder>> {
+    schema
+        .fields()
+        .iter()
+        .map(|field| {
+            let dt = field.data_type();
+            if matches!(dt, DataType::Dictionary(_, _)) {
+                make_dict_builder(dt, batch_size)
+            } else {
+                make_builder(dt, batch_size)
+            }
+        })
+        .collect::<Vec<_>>()
+}
+
+macro_rules! primitive_dict_builder_inner_helper {
+    ($kt:ty, $vt:ty, $capacity:ident) => {
+        Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity(
+            $capacity,
+            $capacity / 100,
+        ))
+    };
+}
+
+macro_rules! primitive_dict_builder_helper {
+    ($kt:ty, $vt:ident, $capacity:ident) => {
+        match $vt.as_ref() {
+            DataType::Int8 => {
+                primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity)
+            }
+            DataType::Int16 => {
+                primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity)
+            }
+            DataType::Int32 => {
+                primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity)
+            }
+            DataType::Int64 => {
+                primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity)
+            }
+            DataType::UInt8 => {
+                primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity)
+            }
+            DataType::UInt16 => {
+                primitive_dict_builder_inner_helper!($kt, UInt16Type, 
$capacity)
+            }
+            DataType::UInt32 => {
+                primitive_dict_builder_inner_helper!($kt, UInt32Type, 
$capacity)
+            }
+            DataType::UInt64 => {
+                primitive_dict_builder_inner_helper!($kt, UInt64Type, 
$capacity)
+            }
+            DataType::Float32 => {
+                primitive_dict_builder_inner_helper!($kt, Float32Type, 
$capacity)
+            }
+            DataType::Float64 => {
+                primitive_dict_builder_inner_helper!($kt, Float64Type, 
$capacity)
+            }
+            DataType::Decimal128(p, s) => {
+                let keys_builder = PrimitiveBuilder::<$kt>::new();
+                let values_builder =
+                    
Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s));
+                Box::new(
+                    PrimitiveDictionaryBuilder::<$kt, 
Decimal128Type>::new_from_empty_builders(
+                        keys_builder,
+                        values_builder,
+                    ),
+                )
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
+                let keys_builder = PrimitiveBuilder::<$kt>::new();
+                let values_builder = TimestampMicrosecondBuilder::new()
+                    .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, 
timezone.clone()));
+                Box::new(
+                    PrimitiveDictionaryBuilder::<$kt, 
TimestampMicrosecondType>::new_from_empty_builders(
+                        keys_builder,
+                        values_builder,
+                    ),
+                )
+            }
+            DataType::Date32 => {
+                primitive_dict_builder_inner_helper!($kt, Date32Type, 
$capacity)
+            }
+            DataType::Date64 => {
+                primitive_dict_builder_inner_helper!($kt, Date64Type, 
$capacity)
+            }
+            t => unimplemented!("{:?} is not supported", t),
+        }
+    };
+}
+
+macro_rules! byte_dict_builder_inner_helper {
+    ($kt:ty, $capacity:ident, $builder:ident) => {
+        Box::new($builder::<$kt>::with_capacity(
+            $capacity,
+            $capacity / 100,
+            $capacity,
+        ))
+    };
+}
+
+/// Returns a dictionary array builder with capacity `capacity` that 
corresponds to the datatype
+/// `DataType` This function is useful to construct arrays from an arbitrary 
vectors with
+/// known/expected schema.
+/// TODO: move this to the upstream.
+fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box<dyn 
ArrayBuilder> {
+    match datatype {
+        DataType::Dictionary(key_type, value_type) if 
value_type.is_primitive() => {
+            match key_type.as_ref() {
+                DataType::Int8 => primitive_dict_builder_helper!(Int8Type, 
value_type, capacity),
+                DataType::Int16 => primitive_dict_builder_helper!(Int16Type, 
value_type, capacity),
+                DataType::Int32 => primitive_dict_builder_helper!(Int32Type, 
value_type, capacity),
+                DataType::Int64 => primitive_dict_builder_helper!(Int64Type, 
value_type, capacity),
+                DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, 
value_type, capacity),
+                DataType::UInt16 => {
+                    primitive_dict_builder_helper!(UInt16Type, value_type, 
capacity)
+                }
+                DataType::UInt32 => {
+                    primitive_dict_builder_helper!(UInt32Type, value_type, 
capacity)
+                }
+                DataType::UInt64 => {
+                    primitive_dict_builder_helper!(UInt64Type, value_type, 
capacity)
+                }
+                _ => unreachable!(""),
+            }
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::Utf8) =>
+        {
+            match key_type.as_ref() {
+                DataType::Int8 => {
+                    byte_dict_builder_inner_helper!(Int8Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::Int16 => {
+                    byte_dict_builder_inner_helper!(Int16Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::Int32 => {
+                    byte_dict_builder_inner_helper!(Int32Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::Int64 => {
+                    byte_dict_builder_inner_helper!(Int64Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::UInt8 => {
+                    byte_dict_builder_inner_helper!(UInt8Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::UInt16 => {
+                    byte_dict_builder_inner_helper!(UInt16Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::UInt32 => {
+                    byte_dict_builder_inner_helper!(UInt32Type, capacity, 
StringDictionaryBuilder)
+                }
+                DataType::UInt64 => {
+                    byte_dict_builder_inner_helper!(UInt64Type, capacity, 
StringDictionaryBuilder)
+                }
+                _ => unreachable!(""),
+            }
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::LargeUtf8) =>
+        {
+            match key_type.as_ref() {
+                DataType::Int8 => byte_dict_builder_inner_helper!(
+                    Int8Type,
+                    capacity,
+                    LargeStringDictionaryBuilder
+                ),
+                DataType::Int16 => byte_dict_builder_inner_helper!(
+                    Int16Type,
+                    capacity,
+                    LargeStringDictionaryBuilder
+                ),
+                DataType::Int32 => byte_dict_builder_inner_helper!(
+                    Int32Type,
+                    capacity,
+                    LargeStringDictionaryBuilder
+                ),
+                DataType::Int64 => byte_dict_builder_inner_helper!(
+                    Int64Type,
+                    capacity,
+                    LargeStringDictionaryBuilder
+                ),
+                DataType::UInt8 => byte_dict_builder_inner_helper!(
+                    UInt8Type,
+                    capacity,
+                    LargeStringDictionaryBuilder
+                ),
+                DataType::UInt16 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt16Type,
+                        capacity,
+                        LargeStringDictionaryBuilder
+                    )
+                }
+                DataType::UInt32 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt32Type,
+                        capacity,
+                        LargeStringDictionaryBuilder
+                    )
+                }
+                DataType::UInt64 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt64Type,
+                        capacity,
+                        LargeStringDictionaryBuilder
+                    )
+                }
+                _ => unreachable!(""),
+            }
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::Binary) =>
+        {
+            match key_type.as_ref() {
+                DataType::Int8 => {
+                    byte_dict_builder_inner_helper!(Int8Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::Int16 => {
+                    byte_dict_builder_inner_helper!(Int16Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::Int32 => {
+                    byte_dict_builder_inner_helper!(Int32Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::Int64 => {
+                    byte_dict_builder_inner_helper!(Int64Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::UInt8 => {
+                    byte_dict_builder_inner_helper!(UInt8Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::UInt16 => {
+                    byte_dict_builder_inner_helper!(UInt16Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::UInt32 => {
+                    byte_dict_builder_inner_helper!(UInt32Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                DataType::UInt64 => {
+                    byte_dict_builder_inner_helper!(UInt64Type, capacity, 
BinaryDictionaryBuilder)
+                }
+                _ => unreachable!(""),
+            }
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::LargeBinary) =>
+        {
+            match key_type.as_ref() {
+                DataType::Int8 => byte_dict_builder_inner_helper!(
+                    Int8Type,
+                    capacity,
+                    LargeBinaryDictionaryBuilder
+                ),
+                DataType::Int16 => byte_dict_builder_inner_helper!(
+                    Int16Type,
+                    capacity,
+                    LargeBinaryDictionaryBuilder
+                ),
+                DataType::Int32 => byte_dict_builder_inner_helper!(
+                    Int32Type,
+                    capacity,
+                    LargeBinaryDictionaryBuilder
+                ),
+                DataType::Int64 => byte_dict_builder_inner_helper!(
+                    Int64Type,
+                    capacity,
+                    LargeBinaryDictionaryBuilder
+                ),
+                DataType::UInt8 => byte_dict_builder_inner_helper!(
+                    UInt8Type,
+                    capacity,
+                    LargeBinaryDictionaryBuilder
+                ),
+                DataType::UInt16 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt16Type,
+                        capacity,
+                        LargeBinaryDictionaryBuilder
+                    )
+                }
+                DataType::UInt32 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt32Type,
+                        capacity,
+                        LargeBinaryDictionaryBuilder
+                    )
+                }
+                DataType::UInt64 => {
+                    byte_dict_builder_inner_helper!(
+                        UInt64Type,
+                        capacity,
+                        LargeBinaryDictionaryBuilder
+                    )
+                }
+                _ => unreachable!(""),
+            }
+        }
+        t => panic!("Data type {t:?} is not currently supported"),
+    }
+}
+
+pub(crate) fn slot_size(len: usize, data_type: &DataType) -> usize {
+    match data_type {
+        DataType::Boolean => ceil(len, 8),
+        DataType::Int8 => len,
+        DataType::Int16 => len * 2,
+        DataType::Int32 => len * 4,
+        DataType::Int64 => len * 8,
+        DataType::UInt8 => len,
+        DataType::UInt16 => len * 2,
+        DataType::UInt32 => len * 4,
+        DataType::UInt64 => len * 8,
+        DataType::Float32 => len * 4,
+        DataType::Float64 => len * 8,
+        DataType::Date32 => len * 4,
+        DataType::Date64 => len * 8,
+        DataType::Time32(TimeUnit::Second) => len * 4,
+        DataType::Time32(TimeUnit::Millisecond) => len * 4,
+        DataType::Time64(TimeUnit::Microsecond) => len * 8,
+        DataType::Time64(TimeUnit::Nanosecond) => len * 8,
+        // TODO: this is not accurate, but should be good enough for now
+        DataType::Utf8 => len * 100 + len * 4,
+        DataType::LargeUtf8 => len * 100 + len * 8,
+        DataType::Decimal128(_, _) => len * 16,
+        DataType::Dictionary(key_type, value_type) => {
+            // TODO: this is not accurate, but should be good enough for now
+            slot_size(len, key_type.as_ref()) + slot_size(len / 10, 
value_type.as_ref())
+        }
+        // TODO: this is not accurate, but should be good enough for now
+        DataType::Binary => len * 100 + len * 4,
+        DataType::LargeBinary => len * 100 + len * 8,
+        DataType::FixedSizeBinary(s) => len * (*s as usize),
+        DataType::Timestamp(_, _) => len * 8,
+        dt => unimplemented!(
+            "{}",
+            format!("data type {dt} not supported in shuffle write")
+        ),
+    }
+}
+
+pub(crate) fn append_columns(
+    to: &mut Box<dyn ArrayBuilder>,
+    from: &Arc<dyn Array>,
+    indices: &[usize],
+    data_type: &DataType,
+) {
+    /// Append values from `from` to `to` using `indices`.
+    macro_rules! append {
+        ($arrowty:ident) => {{
+            type B = paste::paste! {[< $arrowty Builder >]};
+            type A = paste::paste! {[< $arrowty Array >]};
+            let t = to.as_any_mut().downcast_mut::<B>().unwrap();
+            let f = from.as_any().downcast_ref::<A>().unwrap();
+            for &i in indices {
+                if f.is_valid(i) {
+                    t.append_value(f.value(i));
+                } else {
+                    t.append_null();
+                }
+            }
+        }};
+    }
+
+    /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method 
returning
+    /// a `Result`.
+    macro_rules! append_unwrap {
+        ($arrowty:ident) => {{
+            type B = paste::paste! {[< $arrowty Builder >]};
+            type A = paste::paste! {[< $arrowty Array >]};
+            let t = to.as_any_mut().downcast_mut::<B>().unwrap();
+            let f = from.as_any().downcast_ref::<A>().unwrap();
+            for &i in indices {
+                if f.is_valid(i) {
+                    t.append_value(f.value(i)).unwrap();
+                } else {
+                    t.append_null();
+                }
+            }
+        }};
+    }
+
+    /// Appends values from a dictionary array to a dictionary builder.
+    macro_rules! append_dict {
+        ($kt:ty, $builder:ty, $dict_array:ty) => {{
+            let t = to.as_any_mut().downcast_mut::<$builder>().unwrap();
+            let f = from
+                .as_any()
+                .downcast_ref::<DictionaryArray<$kt>>()
+                .unwrap()
+                .downcast_dict::<$dict_array>()
+                .unwrap();
+            for &i in indices {
+                if f.is_valid(i) {
+                    t.append_value(f.value(i));
+                } else {
+                    t.append_null();
+                }
+            }
+        }};
+    }
+
+    macro_rules! append_dict_helper {
+        ($kt:ident, $ty:ty, $dict_array:ty) => {{
+            match $kt.as_ref() {
+                DataType::Int8 => append_dict!(Int8Type, 
PrimitiveDictionaryBuilder<Int8Type, $ty>, $dict_array),
+                DataType::Int16 => append_dict!(Int16Type, 
PrimitiveDictionaryBuilder<Int16Type, $ty>, $dict_array),
+                DataType::Int32 => append_dict!(Int32Type, 
PrimitiveDictionaryBuilder<Int32Type, $ty>, $dict_array),
+                DataType::Int64 => append_dict!(Int64Type, 
PrimitiveDictionaryBuilder<Int64Type, $ty>, $dict_array),
+                DataType::UInt8 => append_dict!(UInt8Type, 
PrimitiveDictionaryBuilder<UInt8Type, $ty>, $dict_array),
+                DataType::UInt16 => {
+                    append_dict!(UInt16Type, 
PrimitiveDictionaryBuilder<UInt16Type, $ty>, $dict_array)
+                }
+                DataType::UInt32 => {
+                    append_dict!(UInt32Type, 
PrimitiveDictionaryBuilder<UInt32Type, $ty>, $dict_array)
+                }
+                DataType::UInt64 => {
+                    append_dict!(UInt64Type, 
PrimitiveDictionaryBuilder<UInt64Type, $ty>, $dict_array)
+                }
+                _ => unreachable!("Unknown key type for dictionary"),
+            }
+        }};
+    }
+
+    macro_rules! primitive_append_dict_helper {
+        ($kt:ident, $vt:ident) => {
+            match $vt.as_ref() {
+                DataType::Int8 => {
+                    append_dict_helper!($kt, Int8Type, Int8Array)
+                }
+                DataType::Int16 => {
+                    append_dict_helper!($kt, Int16Type, Int16Array)
+                }
+                DataType::Int32 => {
+                    append_dict_helper!($kt, Int32Type, Int32Array)
+                }
+                DataType::Int64 => {
+                    append_dict_helper!($kt, Int64Type, Int64Array)
+                }
+                DataType::UInt8 => {
+                    append_dict_helper!($kt, UInt8Type, UInt8Array)
+                }
+                DataType::UInt16 => {
+                    append_dict_helper!($kt, UInt16Type, UInt16Array)
+                }
+                DataType::UInt32 => {
+                    append_dict_helper!($kt, UInt32Type, UInt32Array)
+                }
+                DataType::UInt64 => {
+                    append_dict_helper!($kt, UInt64Type, UInt64Array)
+                }
+                DataType::Float32 => {
+                    append_dict_helper!($kt, Float32Type, Float32Array)
+                }
+                DataType::Float64 => {
+                    append_dict_helper!($kt, Float64Type, Float64Array)
+                }
+                DataType::Decimal128(_, _) => {
+                    append_dict_helper!($kt, Decimal128Type, Decimal128Array)
+                }
+                DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                    append_dict_helper!($kt, TimestampMicrosecondType, 
TimestampMicrosecondArray)
+                }
+                DataType::Date32 => {
+                    append_dict_helper!($kt, Date32Type, Date32Array)
+                }
+                DataType::Date64 => {
+                    append_dict_helper!($kt, Date64Type, Date64Array)
+                }
+                t => unimplemented!("{:?} is not supported for appending 
dictionary builder", t),
+            }
+        };
+    }
+
+    macro_rules! append_byte_dict {
+        ($kt:ident, $byte_type:ty, $array_type:ty) => {{
+            match $kt.as_ref() {
+                DataType::Int8 => {
+                    append_dict!(Int8Type, 
GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type)
+                }
+                DataType::Int16 => {
+                    append_dict!(Int16Type,  
GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type)
+                }
+                DataType::Int32 => {
+                    append_dict!(Int32Type,  
GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type)
+                }
+                DataType::Int64 => {
+                    append_dict!(Int64Type,  
GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type)
+                }
+                DataType::UInt8 => {
+                    append_dict!(UInt8Type,  
GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type)
+                }
+                DataType::UInt16 => {
+                    append_dict!(UInt16Type, 
GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type)
+                }
+                DataType::UInt32 => {
+                    append_dict!(UInt32Type, 
GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type)
+                }
+                DataType::UInt64 => {
+                    append_dict!(UInt64Type, 
GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type)
+                }
+                _ => unreachable!("Unknown key type for dictionary"),
+            }
+        }};
+    }
+
+    match data_type {
+        DataType::Boolean => append!(Boolean),
+        DataType::Int8 => append!(Int8),
+        DataType::Int16 => append!(Int16),
+        DataType::Int32 => append!(Int32),
+        DataType::Int64 => append!(Int64),
+        DataType::UInt8 => append!(UInt8),
+        DataType::UInt16 => append!(UInt16),
+        DataType::UInt32 => append!(UInt32),
+        DataType::UInt64 => append!(UInt64),
+        DataType::Float32 => append!(Float32),
+        DataType::Float64 => append!(Float64),
+        DataType::Date32 => append!(Date32),
+        DataType::Date64 => append!(Date64),
+        DataType::Time32(TimeUnit::Second) => append!(Time32Second),
+        DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond),
+        DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond),
+        DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond),
+        DataType::Timestamp(TimeUnit::Microsecond, _) => {
+            append!(TimestampMicrosecond)
+        }
+        DataType::Utf8 => append!(String),
+        DataType::LargeUtf8 => append!(LargeString),
+        DataType::Decimal128(_, _) => append!(Decimal128),
+        DataType::Dictionary(key_type, value_type) if 
value_type.is_primitive() => {
+            primitive_append_dict_helper!(key_type, value_type)
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::Utf8) =>
+        {
+            append_byte_dict!(key_type, GenericStringType<i32>, StringArray)
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::LargeUtf8) =>
+        {
+            append_byte_dict!(key_type, GenericStringType<i64>, 
LargeStringArray)
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::Binary) =>
+        {
+            append_byte_dict!(key_type, GenericBinaryType<i32>, BinaryArray)
+        }
+        DataType::Dictionary(key_type, value_type)
+            if matches!(value_type.as_ref(), DataType::LargeBinary) =>
+        {
+            append_byte_dict!(key_type, GenericBinaryType<i64>, 
LargeBinaryArray)
+        }
+        DataType::Binary => append!(Binary),
+        DataType::LargeBinary => append!(LargeBinary),
+        DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary),
+        t => unimplemented!(
+            "{}",
+            format!("data type {} not supported in shuffle write", t)
+        ),
+    }
+}
+
+pub(crate) fn make_batch(
+    schema: SchemaRef,
+    mut arrays: Vec<Box<dyn ArrayBuilder>>,
+    row_count: usize,
+) -> ArrowResult<RecordBatch> {
+    let columns = arrays.iter_mut().map(|array| array.finish()).collect();
+    let options = 
RecordBatchOptions::new().with_row_count(Option::from(row_count));
+    RecordBatch::try_new_with_options(schema, columns, &options)
+}
diff --git a/native/core/src/execution/shuffle/codec.rs 
b/native/core/src/execution/shuffle/codec.rs
index 3c735434c..8d7b45431 100644
--- a/native/core/src/execution/shuffle/codec.rs
+++ b/native/core/src/execution/shuffle/codec.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::errors::{CometError, CometResult};
 use crate::parquet::data_type::AsBytes;
+use arrow::ipc::reader::StreamReader;
+use arrow::ipc::writer::StreamWriter;
 use arrow_array::cast::AsArray;
 use arrow_array::types::Int32Type;
 use arrow_array::{
@@ -25,8 +28,13 @@ use arrow_array::{
 };
 use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, 
ScalarBuffer};
 use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use bytes::Buf;
+use crc32fast::Hasher;
+use datafusion::error::Result;
+use datafusion::physical_plan::metrics::Time;
 use datafusion_common::DataFusionError;
-use std::io::Write;
+use simd_adler32::Adler32;
+use std::io::{Cursor, Read, Seek, SeekFrom, Write};
 use std::sync::Arc;
 
 pub fn fast_codec_supports_type(data_type: &DataType) -> bool {
@@ -706,3 +714,311 @@ mod test {
         .unwrap()
     }
 }
+
+#[derive(Debug, Clone)]
+pub enum CompressionCodec {
+    None,
+    Lz4Frame,
+    Zstd(i32),
+    Snappy,
+}
+
+pub struct ShuffleBlockWriter {
+    fast_encoding: bool,
+    codec: CompressionCodec,
+    encoded_schema: Vec<u8>,
+    header_bytes: Vec<u8>,
+}
+
+impl ShuffleBlockWriter {
+    pub fn try_new(
+        schema: &Schema,
+        enable_fast_encoding: bool,
+        codec: CompressionCodec,
+    ) -> Result<Self> {
+        let mut encoded_schema = vec![];
+
+        let enable_fast_encoding = enable_fast_encoding
+            && schema
+                .fields()
+                .iter()
+                .all(|f| fast_codec_supports_type(f.data_type()));
+
+        // encode the schema once and then reuse the encoded bytes for each 
batch
+        if enable_fast_encoding {
+            let mut w = BatchWriter::new(&mut encoded_schema);
+            w.write_partial_schema(schema)?;
+        }
+
+        let header_bytes = Vec::with_capacity(24);
+        let mut cursor = Cursor::new(header_bytes);
+
+        // leave space for compressed message length
+        cursor.seek_relative(8)?;
+
+        // write number of columns because JVM side needs to know how many 
addresses to allocate
+        let field_count = schema.fields().len();
+        cursor.write_all(&field_count.to_le_bytes())?;
+
+        // write compression codec to header
+        let codec_header = match &codec {
+            CompressionCodec::Snappy => b"SNAP",
+            CompressionCodec::Lz4Frame => b"LZ4_",
+            CompressionCodec::Zstd(_) => b"ZSTD",
+            CompressionCodec::None => b"NONE",
+        };
+        cursor.write_all(codec_header)?;
+
+        // write encoding scheme
+        if enable_fast_encoding {
+            cursor.write_all(b"FAST")?;
+        } else {
+            cursor.write_all(b"AIPC")?;
+        }
+
+        let header_bytes = cursor.into_inner();
+
+        Ok(Self {
+            fast_encoding: enable_fast_encoding,
+            codec,
+            encoded_schema,
+            header_bytes,
+        })
+    }
+
+    /// Writes given record batch as Arrow IPC bytes into given writer.
+    /// Returns number of bytes written.
+    pub fn write_batch<W: Write + Seek>(
+        &self,
+        batch: &RecordBatch,
+        output: &mut W,
+        ipc_time: &Time,
+    ) -> Result<usize> {
+        if batch.num_rows() == 0 {
+            return Ok(0);
+        }
+
+        let mut timer = ipc_time.timer();
+        let start_pos = output.stream_position()?;
+
+        // write header
+        output.write_all(&self.header_bytes)?;
+
+        let output = if self.fast_encoding {
+            match &self.codec {
+                CompressionCodec::None => {
+                    let mut fast_writer = BatchWriter::new(&mut *output);
+                    fast_writer.write_all(&self.encoded_schema)?;
+                    fast_writer.write_batch(batch)?;
+                    output
+                }
+                CompressionCodec::Lz4Frame => {
+                    let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
+                    let mut fast_writer = BatchWriter::new(&mut wtr);
+                    fast_writer.write_all(&self.encoded_schema)?;
+                    fast_writer.write_batch(batch)?;
+                    wtr.finish().map_err(|e| {
+                        DataFusionError::Execution(format!("lz4 compression 
error: {}", e))
+                    })?
+                }
+                CompressionCodec::Zstd(level) => {
+                    let mut encoder = zstd::Encoder::new(output, *level)?;
+                    let mut fast_writer = BatchWriter::new(&mut encoder);
+                    fast_writer.write_all(&self.encoded_schema)?;
+                    fast_writer.write_batch(batch)?;
+                    encoder.finish()?
+                }
+                CompressionCodec::Snappy => {
+                    let mut encoder = snap::write::FrameEncoder::new(output);
+                    let mut fast_writer = BatchWriter::new(&mut encoder);
+                    fast_writer.write_all(&self.encoded_schema)?;
+                    fast_writer.write_batch(batch)?;
+                    encoder.into_inner().map_err(|e| {
+                        DataFusionError::Execution(format!("snappy compression 
error: {}", e))
+                    })?
+                }
+            }
+        } else {
+            match &self.codec {
+                CompressionCodec::None => {
+                    let mut arrow_writer = StreamWriter::try_new(output, 
&batch.schema())?;
+                    arrow_writer.write(batch)?;
+                    arrow_writer.finish()?;
+                    arrow_writer.into_inner()?
+                }
+                CompressionCodec::Lz4Frame => {
+                    let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
+                    let mut arrow_writer = StreamWriter::try_new(&mut wtr, 
&batch.schema())?;
+                    arrow_writer.write(batch)?;
+                    arrow_writer.finish()?;
+                    wtr.finish().map_err(|e| {
+                        DataFusionError::Execution(format!("lz4 compression 
error: {}", e))
+                    })?
+                }
+
+                CompressionCodec::Zstd(level) => {
+                    let encoder = zstd::Encoder::new(output, *level)?;
+                    let mut arrow_writer = StreamWriter::try_new(encoder, 
&batch.schema())?;
+                    arrow_writer.write(batch)?;
+                    arrow_writer.finish()?;
+                    let zstd_encoder = arrow_writer.into_inner()?;
+                    zstd_encoder.finish()?
+                }
+
+                CompressionCodec::Snappy => {
+                    let mut wtr = snap::write::FrameEncoder::new(output);
+                    let mut arrow_writer = StreamWriter::try_new(&mut wtr, 
&batch.schema())?;
+                    arrow_writer.write(batch)?;
+                    arrow_writer.finish()?;
+                    wtr.into_inner().map_err(|e| {
+                        DataFusionError::Execution(format!("snappy compression 
error: {}", e))
+                    })?
+                }
+            }
+        };
+
+        // fill ipc length
+        let end_pos = output.stream_position()?;
+        let ipc_length = end_pos - start_pos - 8;
+        let max_size = i32::MAX as u64;
+        if ipc_length > max_size {
+            return Err(DataFusionError::Execution(format!(
+                "Shuffle block size {ipc_length} exceeds maximum size of 
{max_size}. \
+                Try reducing batch size or increasing compression level"
+            )));
+        }
+
+        // fill ipc length
+        output.seek(SeekFrom::Start(start_pos))?;
+        output.write_all(&ipc_length.to_le_bytes())?;
+        output.seek(SeekFrom::Start(end_pos))?;
+
+        timer.stop();
+
+        Ok((end_pos - start_pos) as usize)
+    }
+}
+
+pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> {
+    let fast_encoding = match &bytes[4..8] {
+        b"AIPC" => false,
+        b"FAST" => true,
+        other => {
+            return Err(DataFusionError::Internal(format!(
+                "invalid encoding schema: {other:?}"
+            )))
+        }
+    };
+    match &bytes[0..4] {
+        b"SNAP" => {
+            let mut decoder = snap::read::FrameDecoder::new(&bytes[8..]);
+            if fast_encoding {
+                // TODO avoid reading bytes into interim buffer
+                let mut buffer = vec![];
+                decoder.read_to_end(&mut buffer)?;
+                let mut reader = BatchReader::new(&buffer);
+                reader.read_batch()
+            } else {
+                let mut reader = StreamReader::try_new(decoder, None)?;
+                reader.next().unwrap().map_err(|e| e.into())
+            }
+        }
+        b"LZ4_" => {
+            let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[8..]);
+            if fast_encoding {
+                // TODO avoid reading bytes into interim buffer
+                let mut buffer = vec![];
+                decoder.read_to_end(&mut buffer)?;
+                let mut reader = BatchReader::new(&buffer);
+                reader.read_batch()
+            } else {
+                let mut reader = StreamReader::try_new(decoder, None)?;
+                reader.next().unwrap().map_err(|e| e.into())
+            }
+        }
+        b"ZSTD" => {
+            let mut decoder = zstd::Decoder::new(&bytes[8..])?;
+            if fast_encoding {
+                // TODO avoid reading bytes into interim buffer
+                let mut buffer = vec![];
+                decoder.read_to_end(&mut buffer)?;
+                let mut reader = BatchReader::new(&buffer);
+                reader.read_batch()
+            } else {
+                let mut reader = StreamReader::try_new(decoder, None)?;
+                reader.next().unwrap().map_err(|e| e.into())
+            }
+        }
+        b"NONE" => {
+            if fast_encoding {
+                let mut reader = BatchReader::new(&bytes[8..]);
+                reader.read_batch()
+            } else {
+                let mut reader = StreamReader::try_new(&bytes[8..], None)?;
+                reader.next().unwrap().map_err(|e| e.into())
+            }
+        }
+        other => Err(DataFusionError::Execution(format!(
+            "Failed to decode batch: invalid compression codec: {other:?}"
+        ))),
+    }
+}
+
+/// Checksum algorithms for writing IPC bytes.
+#[derive(Clone)]
+pub(crate) enum Checksum {
+    /// CRC32 checksum algorithm.
+    CRC32(Hasher),
+    /// Adler32 checksum algorithm.
+    Adler32(Adler32),
+}
+
+impl Checksum {
+    pub(crate) fn try_new(algo: i32, initial_opt: Option<u32>) -> 
CometResult<Self> {
+        match algo {
+            0 => {
+                let hasher = if let Some(initial) = initial_opt {
+                    Hasher::new_with_initial(initial)
+                } else {
+                    Hasher::new()
+                };
+                Ok(Checksum::CRC32(hasher))
+            }
+            1 => {
+                let hasher = if let Some(initial) = initial_opt {
+                    // Note that Adler32 initial state is not zero.
+                    // i.e., `Adler32::from_checksum(0)` is not the same as 
`Adler32::new()`.
+                    Adler32::from_checksum(initial)
+                } else {
+                    Adler32::new()
+                };
+                Ok(Checksum::Adler32(hasher))
+            }
+            _ => Err(CometError::Internal(
+                "Unsupported checksum algorithm".to_string(),
+            )),
+        }
+    }
+
+    pub(crate) fn update(&mut self, cursor: &mut Cursor<&mut Vec<u8>>) -> 
CometResult<()> {
+        match self {
+            Checksum::CRC32(hasher) => {
+                std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
+                hasher.update(cursor.chunk());
+                Ok(())
+            }
+            Checksum::Adler32(hasher) => {
+                std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
+                hasher.write(cursor.chunk());
+                Ok(())
+            }
+        }
+    }
+
+    pub(crate) fn finalize(self) -> u32 {
+        match self {
+            Checksum::CRC32(hasher) => hasher.finalize(),
+            Checksum::Adler32(hasher) => hasher.finish(),
+        }
+    }
+}
diff --git a/native/core/src/execution/shuffle/mod.rs 
b/native/core/src/execution/shuffle/mod.rs
index 716034a61..acd7ff551 100644
--- a/native/core/src/execution/shuffle/mod.rs
+++ b/native/core/src/execution/shuffle/mod.rs
@@ -15,13 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod codec;
+pub(crate) mod builders;
+pub(crate) mod codec;
 mod list;
 mod map;
 pub mod row;
 mod shuffle_writer;
+
 pub use codec::BatchWriter;
 
-pub use shuffle_writer::{
-    read_ipc_compressed, CompressionCodec, ShuffleBlockWriter, 
ShuffleWriterExec,
-};
+pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter};
+pub use shuffle_writer::ShuffleWriterExec;
diff --git a/native/core/src/execution/shuffle/row.rs 
b/native/core/src/execution/shuffle/row.rs
index f9ecf4790..540bebb1d 100644
--- a/native/core/src/execution/shuffle/row.rs
+++ b/native/core/src/execution/shuffle/row.rs
@@ -21,9 +21,9 @@ use crate::{
     errors::CometError,
     execution::{
         shuffle::{
+            codec::{Checksum, ShuffleBlockWriter},
             list::{append_list_element, SparkUnsafeArray},
             map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap},
-            shuffle_writer::{Checksum, ShuffleBlockWriter},
         },
         utils::bytes_to_i128,
     },
@@ -292,7 +292,7 @@ macro_rules! downcast_builder_ref {
 }
 
 // Expose the macro for other modules.
-use crate::execution::shuffle::shuffle_writer::CompressionCodec;
+use crate::execution::shuffle::CompressionCodec;
 pub(crate) use downcast_builder_ref;
 
 /// Appends field of row to the given struct builder. `dt` is the data type of 
the field.
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 70e832a73..696bfd05d 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -17,25 +17,14 @@
 
 //! Defines the External shuffle repartition plan.
 
-use crate::execution::shuffle::codec::{fast_codec_supports_type, BatchReader};
-use crate::execution::shuffle::BatchWriter;
-use crate::{
-    common::bit::ceil,
-    errors::{CometError, CometResult},
+use crate::execution::shuffle::builders::{
+    append_columns, make_batch, new_array_builders, slot_size,
 };
-use arrow::ipc::reader::StreamReader;
-use arrow::{datatypes::*, ipc::writer::StreamWriter};
+use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
 use async_trait::async_trait;
-use bytes::Buf;
-use crc32fast::Hasher;
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
 use datafusion::{
-    arrow::{
-        array::*,
-        datatypes::{DataType, SchemaRef, TimeUnit},
-        error::{ArrowError, Result as ArrowResult},
-        record_batch::RecordBatch,
-    },
+    arrow::{array::*, datatypes::SchemaRef, error::ArrowError, 
record_batch::RecordBatch},
     error::{DataFusionError, Result},
     execution::{
         context::TaskContext,
@@ -57,7 +46,6 @@ use datafusion_physical_expr::EquivalenceProperties;
 use futures::executor::block_on;
 use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt};
 use itertools::Itertools;
-use simd_adler32::Adler32;
 use std::io::Error;
 use std::{
     any::Any,
@@ -71,14 +59,6 @@ use std::{
 };
 use tokio::time::Instant;
 
-/// The status of appending rows to a partition buffer.
-enum AppendRowStatus {
-    /// The difference in memory usage after appending rows
-    MemDiff(Result<isize>),
-    /// The index of the next row to append
-    StartIndex(usize),
-}
-
 /// The shuffle writer operator maps each input partition to M output 
partitions based on a
 /// partitioning scheme. No guarantees are made about the order of the 
resulting partitions.
 #[derive(Debug)]
@@ -101,6 +81,36 @@ pub struct ShuffleWriterExec {
     enable_fast_encoding: bool,
 }
 
+impl ShuffleWriterExec {
+    /// Create a new ShuffleWriterExec
+    pub fn try_new(
+        input: Arc<dyn ExecutionPlan>,
+        partitioning: Partitioning,
+        codec: CompressionCodec,
+        output_data_file: String,
+        output_index_file: String,
+        enable_fast_encoding: bool,
+    ) -> Result<Self> {
+        let cache = PlanProperties::new(
+            EquivalenceProperties::new(Arc::clone(&input.schema())),
+            partitioning.clone(),
+            EmissionType::Final,
+            Boundedness::Bounded,
+        );
+
+        Ok(ShuffleWriterExec {
+            input,
+            partitioning,
+            metrics: ExecutionPlanMetricsSet::new(),
+            output_data_file,
+            output_index_file,
+            cache,
+            codec,
+            enable_fast_encoding,
+        })
+    }
+}
+
 impl DisplayAs for ShuffleWriterExec {
     fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> 
std::fmt::Result {
         match t {
@@ -122,6 +132,22 @@ impl ExecutionPlan for ShuffleWriterExec {
         self
     }
 
+    fn name(&self) -> &str {
+        "ShuffleWriterExec"
+    }
+
+    fn metrics(&self) -> Option<MetricsSet> {
+        Some(self.metrics.clone_inner())
+    }
+
+    fn statistics(&self) -> Result<Statistics> {
+        self.input.statistics()
+    }
+
+    fn properties(&self) -> &PlanProperties {
+        &self.cache
+    }
+
     /// Get the schema for this execution plan
     fn schema(&self) -> SchemaRef {
         self.input.schema()
@@ -175,569 +201,138 @@ impl ExecutionPlan for ShuffleWriterExec {
             .try_flatten(),
         )))
     }
+}
 
-    fn metrics(&self) -> Option<MetricsSet> {
-        Some(self.metrics.clone_inner())
-    }
+#[allow(clippy::too_many_arguments)]
+async fn external_shuffle(
+    mut input: SendableRecordBatchStream,
+    partition_id: usize,
+    output_data_file: String,
+    output_index_file: String,
+    partitioning: Partitioning,
+    metrics: ShuffleRepartitionerMetrics,
+    context: Arc<TaskContext>,
+    codec: CompressionCodec,
+    enable_fast_encoding: bool,
+) -> Result<SendableRecordBatchStream> {
+    let schema = input.schema();
+    let mut repartitioner = ShuffleRepartitioner::try_new(
+        partition_id,
+        output_data_file,
+        output_index_file,
+        Arc::clone(&schema),
+        partitioning,
+        metrics,
+        context.runtime_env(),
+        context.session_config().batch_size(),
+        codec,
+        enable_fast_encoding,
+    )?;
 
-    fn statistics(&self) -> Result<Statistics> {
-        self.input.statistics()
+    while let Some(batch) = input.next().await {
+        // Block on the repartitioner to insert the batch and shuffle the rows
+        // into the corresponding partition buffer.
+        // Otherwise, pull the next batch from the input stream might 
overwrite the
+        // current batch in the repartitioner.
+        block_on(repartitioner.insert_batch(batch?))?;
     }
+    repartitioner.shuffle_write().await
+}
 
-    fn properties(&self) -> &PlanProperties {
-        &self.cache
-    }
+struct ShuffleRepartitionerMetrics {
+    /// metrics
+    baseline: BaselineMetrics,
 
-    fn name(&self) -> &str {
-        "ShuffleWriterExec"
-    }
-}
+    /// Time to perform repartitioning
+    repart_time: Time,
 
-impl ShuffleWriterExec {
-    /// Create a new ShuffleWriterExec
-    pub fn try_new(
-        input: Arc<dyn ExecutionPlan>,
-        partitioning: Partitioning,
-        codec: CompressionCodec,
-        output_data_file: String,
-        output_index_file: String,
-        enable_fast_encoding: bool,
-    ) -> Result<Self> {
-        let cache = PlanProperties::new(
-            EquivalenceProperties::new(Arc::clone(&input.schema())),
-            partitioning.clone(),
-            EmissionType::Final,
-            Boundedness::Bounded,
-        );
+    /// Time interacting with memory pool
+    mempool_time: Time,
 
-        Ok(ShuffleWriterExec {
-            input,
-            partitioning,
-            metrics: ExecutionPlanMetricsSet::new(),
-            output_data_file,
-            output_index_file,
-            cache,
-            codec,
-            enable_fast_encoding,
-        })
+    /// Time encoding batches to IPC format
+    encode_time: Time,
+
+    /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL 
Metrics.
+    write_time: Time,
+
+    /// Number of input batches
+    input_batches: Count,
+
+    /// count of spills during the execution of the operator
+    spill_count: Count,
+
+    /// total spilled bytes during the execution of the operator
+    spilled_bytes: Count,
+
+    /// The original size of spilled data. Different to `spilled_bytes` 
because of compression.
+    data_size: Count,
+}
+
+impl ShuffleRepartitionerMetrics {
+    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
+        Self {
+            baseline: BaselineMetrics::new(metrics, partition),
+            repart_time: 
MetricBuilder::new(metrics).subset_time("repart_time", partition),
+            mempool_time: 
MetricBuilder::new(metrics).subset_time("mempool_time", partition),
+            encode_time: 
MetricBuilder::new(metrics).subset_time("encode_time", partition),
+            write_time: MetricBuilder::new(metrics).subset_time("write_time", 
partition),
+            input_batches: 
MetricBuilder::new(metrics).counter("input_batches", partition),
+            spill_count: MetricBuilder::new(metrics).spill_count(partition),
+            spilled_bytes: 
MetricBuilder::new(metrics).spilled_bytes(partition),
+            data_size: MetricBuilder::new(metrics).counter("data_size", 
partition),
+        }
     }
 }
 
-struct PartitionBuffer {
-    /// The schema of batches to be partitioned.
+struct ShuffleRepartitioner {
+    output_data_file: String,
+    output_index_file: String,
     schema: SchemaRef,
-    /// The "frozen" Arrow IPC bytes of active data. They are frozen when 
`flush` is called.
-    frozen: Vec<u8>,
-    /// Array builders for appending rows into buffering batches.
-    active: Vec<Box<dyn ArrayBuilder>>,
-    /// The estimation of memory size of active builders in bytes when they 
are filled.
-    active_slots_mem_size: usize,
-    /// Number of rows in active builders.
-    num_active_rows: usize,
-    /// The maximum number of rows in a batch. Once `num_active_rows` reaches 
`batch_size`,
-    /// the active array builders will be frozen and appended to frozen buffer 
`frozen`.
-    batch_size: usize,
-    /// Memory reservation for this partition buffer.
+    buffered_partitions: Vec<PartitionBuffer>,
+    spills: Mutex<Vec<SpillInfo>>,
+    /// Sort expressions
+    /// Partitioning scheme to use
+    partitioning: Partitioning,
+    num_output_partitions: usize,
+    runtime: Arc<RuntimeEnv>,
+    metrics: ShuffleRepartitionerMetrics,
     reservation: MemoryReservation,
-    /// Writer that performs encoding and compression
-    shuffle_block_writer: ShuffleBlockWriter,
+    /// Hashes for each row in the current batch
+    hashes_buf: Vec<u32>,
+    /// Partition ids for each row in the current batch
+    partition_ids: Vec<u64>,
+    /// The configured batch size
+    batch_size: usize,
 }
 
-impl PartitionBuffer {
-    fn try_new(
+impl ShuffleRepartitioner {
+    #[allow(clippy::too_many_arguments)]
+    pub fn try_new(
+        partition_id: usize,
+        output_data_file: String,
+        output_index_file: String,
         schema: SchemaRef,
+        partitioning: Partitioning,
+        metrics: ShuffleRepartitionerMetrics,
+        runtime: Arc<RuntimeEnv>,
         batch_size: usize,
-        partition_id: usize,
-        runtime: &Arc<RuntimeEnv>,
         codec: CompressionCodec,
         enable_fast_encoding: bool,
     ) -> Result<Self> {
-        let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", 
partition_id))
+        let num_output_partitions = partitioning.partition_count();
+        let reservation = 
MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
             .with_can_spill(true)
             .register(&runtime.memory_pool);
-        let shuffle_block_writer =
-            ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, 
codec)?;
-        Ok(Self {
-            schema,
-            frozen: vec![],
-            active: vec![],
-            active_slots_mem_size: 0,
-            num_active_rows: 0,
-            batch_size,
-            reservation,
-            shuffle_block_writer,
-        })
-    }
-
-    /// Initializes active builders if necessary.
-    /// Returns error if memory reservation fails.
-    fn init_active_if_necessary(&mut self, metrics: 
&ShuffleRepartitionerMetrics) -> Result<isize> {
-        let mut mem_diff = 0;
 
-        if self.active.is_empty() {
-            // Estimate the memory size of active builders
-            if self.active_slots_mem_size == 0 {
-                self.active_slots_mem_size = self
-                    .schema
-                    .fields()
-                    .iter()
-                    .map(|field| slot_size(self.batch_size, field.data_type()))
-                    .sum::<usize>();
-            }
+        let mut hashes_buf = Vec::with_capacity(batch_size);
+        let mut partition_ids = Vec::with_capacity(batch_size);
 
-            let mut mempool_timer = metrics.mempool_time.timer();
-            self.reservation.try_grow(self.active_slots_mem_size)?;
-            mempool_timer.stop();
-
-            let mut repart_timer = metrics.repart_time.timer();
-            self.active = new_array_builders(&self.schema, self.batch_size);
-            repart_timer.stop();
-
-            mem_diff += self.active_slots_mem_size as isize;
-        }
-        Ok(mem_diff)
-    }
-
-    /// Appends rows of specified indices from columns into active array 
builders.
-    fn append_rows(
-        &mut self,
-        columns: &[ArrayRef],
-        indices: &[usize],
-        start_index: usize,
-        metrics: &ShuffleRepartitionerMetrics,
-    ) -> AppendRowStatus {
-        let mut mem_diff = 0;
-        let mut start = start_index;
-
-        // lazy init because some partition may be empty
-        let init = self.init_active_if_necessary(metrics);
-        if init.is_err() {
-            return AppendRowStatus::StartIndex(start);
-        }
-        mem_diff += init.unwrap();
-
-        while start < indices.len() {
-            let end = (start + self.batch_size).min(indices.len());
-
-            let mut repart_timer = metrics.repart_time.timer();
-            self.active
-                .iter_mut()
-                .zip(columns)
-                .for_each(|(builder, column)| {
-                    append_columns(builder, column, &indices[start..end], 
column.data_type());
-                });
-            self.num_active_rows += end - start;
-            repart_timer.stop();
-
-            if self.num_active_rows >= self.batch_size {
-                let flush = self.flush(metrics);
-                if let Err(e) = flush {
-                    return AppendRowStatus::MemDiff(Err(e));
-                }
-                mem_diff += flush.unwrap();
-
-                let init = self.init_active_if_necessary(metrics);
-                if init.is_err() {
-                    return AppendRowStatus::StartIndex(end);
-                }
-                mem_diff += init.unwrap();
-            }
-            start = end;
-        }
-        AppendRowStatus::MemDiff(Ok(mem_diff))
-    }
-
-    /// flush active data into frozen bytes
-    fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> 
Result<isize> {
-        if self.num_active_rows == 0 {
-            return Ok(0);
-        }
-        let mut mem_diff = 0isize;
-
-        // active -> staging
-        let active = std::mem::take(&mut self.active);
-        let num_rows = self.num_active_rows;
-        self.num_active_rows = 0;
-
-        let mut mempool_timer = metrics.mempool_time.timer();
-        self.reservation.try_shrink(self.active_slots_mem_size)?;
-        mempool_timer.stop();
-
-        let mut repart_timer = metrics.repart_time.timer();
-        let frozen_batch = make_batch(Arc::clone(&self.schema), active, 
num_rows)?;
-        repart_timer.stop();
-
-        let frozen_capacity_old = self.frozen.capacity();
-        let mut cursor = Cursor::new(&mut self.frozen);
-        cursor.seek(SeekFrom::End(0))?;
-        self.shuffle_block_writer
-            .write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?;
-
-        mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
-        Ok(mem_diff)
-    }
-}
-
-fn slot_size(len: usize, data_type: &DataType) -> usize {
-    match data_type {
-        DataType::Boolean => ceil(len, 8),
-        DataType::Int8 => len,
-        DataType::Int16 => len * 2,
-        DataType::Int32 => len * 4,
-        DataType::Int64 => len * 8,
-        DataType::UInt8 => len,
-        DataType::UInt16 => len * 2,
-        DataType::UInt32 => len * 4,
-        DataType::UInt64 => len * 8,
-        DataType::Float32 => len * 4,
-        DataType::Float64 => len * 8,
-        DataType::Date32 => len * 4,
-        DataType::Date64 => len * 8,
-        DataType::Time32(TimeUnit::Second) => len * 4,
-        DataType::Time32(TimeUnit::Millisecond) => len * 4,
-        DataType::Time64(TimeUnit::Microsecond) => len * 8,
-        DataType::Time64(TimeUnit::Nanosecond) => len * 8,
-        // TODO: this is not accurate, but should be good enough for now
-        DataType::Utf8 => len * 100 + len * 4,
-        DataType::LargeUtf8 => len * 100 + len * 8,
-        DataType::Decimal128(_, _) => len * 16,
-        DataType::Dictionary(key_type, value_type) => {
-            // TODO: this is not accurate, but should be good enough for now
-            slot_size(len, key_type.as_ref()) + slot_size(len / 10, 
value_type.as_ref())
-        }
-        // TODO: this is not accurate, but should be good enough for now
-        DataType::Binary => len * 100 + len * 4,
-        DataType::LargeBinary => len * 100 + len * 8,
-        DataType::FixedSizeBinary(s) => len * (*s as usize),
-        DataType::Timestamp(_, _) => len * 8,
-        dt => unimplemented!(
-            "{}",
-            format!("data type {dt} not supported in shuffle write")
-        ),
-    }
-}
-
-fn append_columns(
-    to: &mut Box<dyn ArrayBuilder>,
-    from: &Arc<dyn Array>,
-    indices: &[usize],
-    data_type: &DataType,
-) {
-    /// Append values from `from` to `to` using `indices`.
-    macro_rules! append {
-        ($arrowty:ident) => {{
-            type B = paste::paste! {[< $arrowty Builder >]};
-            type A = paste::paste! {[< $arrowty Array >]};
-            let t = to.as_any_mut().downcast_mut::<B>().unwrap();
-            let f = from.as_any().downcast_ref::<A>().unwrap();
-            for &i in indices {
-                if f.is_valid(i) {
-                    t.append_value(f.value(i));
-                } else {
-                    t.append_null();
-                }
-            }
-        }};
-    }
-
-    /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method 
returning
-    /// a `Result`.
-    macro_rules! append_unwrap {
-        ($arrowty:ident) => {{
-            type B = paste::paste! {[< $arrowty Builder >]};
-            type A = paste::paste! {[< $arrowty Array >]};
-            let t = to.as_any_mut().downcast_mut::<B>().unwrap();
-            let f = from.as_any().downcast_ref::<A>().unwrap();
-            for &i in indices {
-                if f.is_valid(i) {
-                    t.append_value(f.value(i)).unwrap();
-                } else {
-                    t.append_null();
-                }
-            }
-        }};
-    }
-
-    /// Appends values from a dictionary array to a dictionary builder.
-    macro_rules! append_dict {
-        ($kt:ty, $builder:ty, $dict_array:ty) => {{
-            let t = to.as_any_mut().downcast_mut::<$builder>().unwrap();
-            let f = from
-                .as_any()
-                .downcast_ref::<DictionaryArray<$kt>>()
-                .unwrap()
-                .downcast_dict::<$dict_array>()
-                .unwrap();
-            for &i in indices {
-                if f.is_valid(i) {
-                    t.append_value(f.value(i));
-                } else {
-                    t.append_null();
-                }
-            }
-        }};
-    }
-
-    macro_rules! append_dict_helper {
-        ($kt:ident, $ty:ty, $dict_array:ty) => {{
-            match $kt.as_ref() {
-                DataType::Int8 => append_dict!(Int8Type, 
PrimitiveDictionaryBuilder<Int8Type, $ty>, $dict_array),
-                DataType::Int16 => append_dict!(Int16Type, 
PrimitiveDictionaryBuilder<Int16Type, $ty>, $dict_array),
-                DataType::Int32 => append_dict!(Int32Type, 
PrimitiveDictionaryBuilder<Int32Type, $ty>, $dict_array),
-                DataType::Int64 => append_dict!(Int64Type, 
PrimitiveDictionaryBuilder<Int64Type, $ty>, $dict_array),
-                DataType::UInt8 => append_dict!(UInt8Type, 
PrimitiveDictionaryBuilder<UInt8Type, $ty>, $dict_array),
-                DataType::UInt16 => {
-                    append_dict!(UInt16Type, 
PrimitiveDictionaryBuilder<UInt16Type, $ty>, $dict_array)
-                }
-                DataType::UInt32 => {
-                    append_dict!(UInt32Type, 
PrimitiveDictionaryBuilder<UInt32Type, $ty>, $dict_array)
-                }
-                DataType::UInt64 => {
-                    append_dict!(UInt64Type, 
PrimitiveDictionaryBuilder<UInt64Type, $ty>, $dict_array)
-                }
-                _ => unreachable!("Unknown key type for dictionary"),
-            }
-        }};
-    }
-
-    macro_rules! primitive_append_dict_helper {
-        ($kt:ident, $vt:ident) => {
-            match $vt.as_ref() {
-                DataType::Int8 => {
-                    append_dict_helper!($kt, Int8Type, Int8Array)
-                }
-                DataType::Int16 => {
-                    append_dict_helper!($kt, Int16Type, Int16Array)
-                }
-                DataType::Int32 => {
-                    append_dict_helper!($kt, Int32Type, Int32Array)
-                }
-                DataType::Int64 => {
-                    append_dict_helper!($kt, Int64Type, Int64Array)
-                }
-                DataType::UInt8 => {
-                    append_dict_helper!($kt, UInt8Type, UInt8Array)
-                }
-                DataType::UInt16 => {
-                    append_dict_helper!($kt, UInt16Type, UInt16Array)
-                }
-                DataType::UInt32 => {
-                    append_dict_helper!($kt, UInt32Type, UInt32Array)
-                }
-                DataType::UInt64 => {
-                    append_dict_helper!($kt, UInt64Type, UInt64Array)
-                }
-                DataType::Float32 => {
-                    append_dict_helper!($kt, Float32Type, Float32Array)
-                }
-                DataType::Float64 => {
-                    append_dict_helper!($kt, Float64Type, Float64Array)
-                }
-                DataType::Decimal128(_, _) => {
-                    append_dict_helper!($kt, Decimal128Type, Decimal128Array)
-                }
-                DataType::Timestamp(TimeUnit::Microsecond, _) => {
-                    append_dict_helper!($kt, TimestampMicrosecondType, 
TimestampMicrosecondArray)
-                }
-                DataType::Date32 => {
-                    append_dict_helper!($kt, Date32Type, Date32Array)
-                }
-                DataType::Date64 => {
-                    append_dict_helper!($kt, Date64Type, Date64Array)
-                }
-                t => unimplemented!("{:?} is not supported for appending 
dictionary builder", t),
-            }
-        };
-    }
-
-    macro_rules! append_byte_dict {
-        ($kt:ident, $byte_type:ty, $array_type:ty) => {{
-            match $kt.as_ref() {
-                DataType::Int8 => {
-                    append_dict!(Int8Type, 
GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type)
-                }
-                DataType::Int16 => {
-                    append_dict!(Int16Type,  
GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type)
-                }
-                DataType::Int32 => {
-                    append_dict!(Int32Type,  
GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type)
-                }
-                DataType::Int64 => {
-                    append_dict!(Int64Type,  
GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type)
-                }
-                DataType::UInt8 => {
-                    append_dict!(UInt8Type,  
GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type)
-                }
-                DataType::UInt16 => {
-                    append_dict!(UInt16Type, 
GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type)
-                }
-                DataType::UInt32 => {
-                    append_dict!(UInt32Type, 
GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type)
-                }
-                DataType::UInt64 => {
-                    append_dict!(UInt64Type, 
GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type)
-                }
-                _ => unreachable!("Unknown key type for dictionary"),
-            }
-        }};
-    }
-
-    match data_type {
-        DataType::Boolean => append!(Boolean),
-        DataType::Int8 => append!(Int8),
-        DataType::Int16 => append!(Int16),
-        DataType::Int32 => append!(Int32),
-        DataType::Int64 => append!(Int64),
-        DataType::UInt8 => append!(UInt8),
-        DataType::UInt16 => append!(UInt16),
-        DataType::UInt32 => append!(UInt32),
-        DataType::UInt64 => append!(UInt64),
-        DataType::Float32 => append!(Float32),
-        DataType::Float64 => append!(Float64),
-        DataType::Date32 => append!(Date32),
-        DataType::Date64 => append!(Date64),
-        DataType::Time32(TimeUnit::Second) => append!(Time32Second),
-        DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond),
-        DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond),
-        DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond),
-        DataType::Timestamp(TimeUnit::Microsecond, _) => {
-            append!(TimestampMicrosecond)
-        }
-        DataType::Utf8 => append!(String),
-        DataType::LargeUtf8 => append!(LargeString),
-        DataType::Decimal128(_, _) => append!(Decimal128),
-        DataType::Dictionary(key_type, value_type) if 
value_type.is_primitive() => {
-            primitive_append_dict_helper!(key_type, value_type)
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::Utf8) =>
-        {
-            append_byte_dict!(key_type, GenericStringType<i32>, StringArray)
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::LargeUtf8) =>
-        {
-            append_byte_dict!(key_type, GenericStringType<i64>, 
LargeStringArray)
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::Binary) =>
-        {
-            append_byte_dict!(key_type, GenericBinaryType<i32>, BinaryArray)
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::LargeBinary) =>
-        {
-            append_byte_dict!(key_type, GenericBinaryType<i64>, 
LargeBinaryArray)
-        }
-        DataType::Binary => append!(Binary),
-        DataType::LargeBinary => append!(LargeBinary),
-        DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary),
-        t => unimplemented!(
-            "{}",
-            format!("data type {} not supported in shuffle write", t)
-        ),
-    }
-}
-
-struct SpillInfo {
-    file: RefCountedTempFile,
-    offsets: Vec<u64>,
-}
-
-struct ShuffleRepartitioner {
-    output_data_file: String,
-    output_index_file: String,
-    schema: SchemaRef,
-    buffered_partitions: Vec<PartitionBuffer>,
-    spills: Mutex<Vec<SpillInfo>>,
-    /// Sort expressions
-    /// Partitioning scheme to use
-    partitioning: Partitioning,
-    num_output_partitions: usize,
-    runtime: Arc<RuntimeEnv>,
-    metrics: ShuffleRepartitionerMetrics,
-    reservation: MemoryReservation,
-    /// Hashes for each row in the current batch
-    hashes_buf: Vec<u32>,
-    /// Partition ids for each row in the current batch
-    partition_ids: Vec<u64>,
-    /// The configured batch size
-    batch_size: usize,
-}
-
-struct ShuffleRepartitionerMetrics {
-    /// metrics
-    baseline: BaselineMetrics,
-
-    /// Time to perform repartitioning
-    repart_time: Time,
-
-    /// Time interacting with memory pool
-    mempool_time: Time,
-
-    /// Time encoding batches to IPC format
-    encode_time: Time,
-
-    /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL 
Metrics.
-    write_time: Time,
-
-    /// Number of input batches
-    input_batches: Count,
-
-    /// count of spills during the execution of the operator
-    spill_count: Count,
-
-    /// total spilled bytes during the execution of the operator
-    spilled_bytes: Count,
-
-    /// The original size of spilled data. Different to `spilled_bytes` 
because of compression.
-    data_size: Count,
-}
-
-impl ShuffleRepartitionerMetrics {
-    fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
-        Self {
-            baseline: BaselineMetrics::new(metrics, partition),
-            repart_time: 
MetricBuilder::new(metrics).subset_time("repart_time", partition),
-            mempool_time: 
MetricBuilder::new(metrics).subset_time("mempool_time", partition),
-            encode_time: 
MetricBuilder::new(metrics).subset_time("encode_time", partition),
-            write_time: MetricBuilder::new(metrics).subset_time("write_time", 
partition),
-            input_batches: 
MetricBuilder::new(metrics).counter("input_batches", partition),
-            spill_count: MetricBuilder::new(metrics).spill_count(partition),
-            spilled_bytes: 
MetricBuilder::new(metrics).spilled_bytes(partition),
-            data_size: MetricBuilder::new(metrics).counter("data_size", 
partition),
-        }
-    }
-}
-
-impl ShuffleRepartitioner {
-    #[allow(clippy::too_many_arguments)]
-    pub fn try_new(
-        partition_id: usize,
-        output_data_file: String,
-        output_index_file: String,
-        schema: SchemaRef,
-        partitioning: Partitioning,
-        metrics: ShuffleRepartitionerMetrics,
-        runtime: Arc<RuntimeEnv>,
-        batch_size: usize,
-        codec: CompressionCodec,
-        enable_fast_encoding: bool,
-    ) -> Result<Self> {
-        let num_output_partitions = partitioning.partition_count();
-        let reservation = 
MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id))
-            .with_can_spill(true)
-            .register(&runtime.memory_pool);
-
-        let mut hashes_buf = Vec::with_capacity(batch_size);
-        let mut partition_ids = Vec::with_capacity(batch_size);
-
-        // Safety: `hashes_buf` will be filled with valid values before being 
used.
-        // `partition_ids` will be filled with valid values before being used.
-        unsafe {
-            hashes_buf.set_len(batch_size);
-            partition_ids.set_len(batch_size);
-        }
+        // Safety: `hashes_buf` will be filled with valid values before being 
used.
+        // `partition_ids` will be filled with valid values before being used.
+        unsafe {
+            hashes_buf.set_len(batch_size);
+            partition_ids.set_len(batch_size);
+        }
 
         Ok(Self {
             output_data_file,
@@ -1129,43 +724,6 @@ impl ShuffleRepartitioner {
     }
 }
 
-/// consume the `buffered_partitions` and do spill into a single temp shuffle 
output file
-fn spill_into(
-    buffered_partitions: &mut [PartitionBuffer],
-    path: &Path,
-    num_output_partitions: usize,
-    metrics: &ShuffleRepartitionerMetrics,
-) -> Result<Vec<u64>> {
-    let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
-
-    for i in 0..num_output_partitions {
-        buffered_partitions[i].flush(metrics)?;
-        output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
-    }
-    let path = path.to_owned();
-
-    let mut write_timer = metrics.write_time.timer();
-
-    let mut offsets = vec![0; num_output_partitions + 1];
-    let mut spill_data = OpenOptions::new()
-        .write(true)
-        .create(true)
-        .truncate(true)
-        .open(path)
-        .map_err(|e| DataFusionError::Execution(format!("Error occurred while 
spilling {}", e)))?;
-
-    for i in 0..num_output_partitions {
-        offsets[i] = spill_data.stream_position()?;
-        spill_data.write_all(&output_batches[i])?;
-        output_batches[i].clear();
-    }
-    write_timer.stop();
-
-    // add one extra offset at last to ease partition length computation
-    offsets[num_output_partitions] = spill_data.stream_position()?;
-    Ok(offsets)
-}
-
 impl Debug for ShuffleRepartitioner {
     fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
         f.debug_struct("ShuffleRepartitioner")
@@ -1177,655 +735,209 @@ impl Debug for ShuffleRepartitioner {
     }
 }
 
-#[allow(clippy::too_many_arguments)]
-async fn external_shuffle(
-    mut input: SendableRecordBatchStream,
-    partition_id: usize,
-    output_data_file: String,
-    output_index_file: String,
-    partitioning: Partitioning,
-    metrics: ShuffleRepartitionerMetrics,
-    context: Arc<TaskContext>,
-    codec: CompressionCodec,
-    enable_fast_encoding: bool,
-) -> Result<SendableRecordBatchStream> {
-    let schema = input.schema();
-    let mut repartitioner = ShuffleRepartitioner::try_new(
-        partition_id,
-        output_data_file,
-        output_index_file,
-        Arc::clone(&schema),
-        partitioning,
-        metrics,
-        context.runtime_env(),
-        context.session_config().batch_size(),
-        codec,
-        enable_fast_encoding,
-    )?;
-
-    while let Some(batch) = input.next().await {
-        // Block on the repartitioner to insert the batch and shuffle the rows
-        // into the corresponding partition buffer.
-        // Otherwise, pull the next batch from the input stream might 
overwrite the
-        // current batch in the repartitioner.
-        block_on(repartitioner.insert_batch(batch?))?;
-    }
-    repartitioner.shuffle_write().await
-}
-
-fn new_array_builders(schema: &SchemaRef, batch_size: usize) -> Vec<Box<dyn 
ArrayBuilder>> {
-    schema
-        .fields()
-        .iter()
-        .map(|field| {
-            let dt = field.data_type();
-            if matches!(dt, DataType::Dictionary(_, _)) {
-                make_dict_builder(dt, batch_size)
-            } else {
-                make_builder(dt, batch_size)
-            }
-        })
-        .collect::<Vec<_>>()
-}
-
-macro_rules! primitive_dict_builder_inner_helper {
-    ($kt:ty, $vt:ty, $capacity:ident) => {
-        Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity(
-            $capacity,
-            $capacity / 100,
-        ))
-    };
-}
-
-macro_rules! primitive_dict_builder_helper {
-    ($kt:ty, $vt:ident, $capacity:ident) => {
-        match $vt.as_ref() {
-            DataType::Int8 => {
-                primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity)
-            }
-            DataType::Int16 => {
-                primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity)
-            }
-            DataType::Int32 => {
-                primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity)
-            }
-            DataType::Int64 => {
-                primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity)
-            }
-            DataType::UInt8 => {
-                primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity)
-            }
-            DataType::UInt16 => {
-                primitive_dict_builder_inner_helper!($kt, UInt16Type, 
$capacity)
-            }
-            DataType::UInt32 => {
-                primitive_dict_builder_inner_helper!($kt, UInt32Type, 
$capacity)
-            }
-            DataType::UInt64 => {
-                primitive_dict_builder_inner_helper!($kt, UInt64Type, 
$capacity)
-            }
-            DataType::Float32 => {
-                primitive_dict_builder_inner_helper!($kt, Float32Type, 
$capacity)
-            }
-            DataType::Float64 => {
-                primitive_dict_builder_inner_helper!($kt, Float64Type, 
$capacity)
-            }
-            DataType::Decimal128(p, s) => {
-                let keys_builder = PrimitiveBuilder::<$kt>::new();
-                let values_builder =
-                    
Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s));
-                Box::new(
-                    PrimitiveDictionaryBuilder::<$kt, 
Decimal128Type>::new_from_empty_builders(
-                        keys_builder,
-                        values_builder,
-                    ),
-                )
-            }
-            DataType::Timestamp(TimeUnit::Microsecond, timezone) => {
-                let keys_builder = PrimitiveBuilder::<$kt>::new();
-                let values_builder = TimestampMicrosecondBuilder::new()
-                    .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, 
timezone.clone()));
-                Box::new(
-                    PrimitiveDictionaryBuilder::<$kt, 
TimestampMicrosecondType>::new_from_empty_builders(
-                        keys_builder,
-                        values_builder,
-                    ),
-                )
-            }
-            DataType::Date32 => {
-                primitive_dict_builder_inner_helper!($kt, Date32Type, 
$capacity)
-            }
-            DataType::Date64 => {
-                primitive_dict_builder_inner_helper!($kt, Date64Type, 
$capacity)
-            }
-            t => unimplemented!("{:?} is not supported", t),
-        }
-    };
+/// The status of appending rows to a partition buffer.
+enum AppendRowStatus {
+    /// The difference in memory usage after appending rows
+    MemDiff(Result<isize>),
+    /// The index of the next row to append
+    StartIndex(usize),
 }
 
-macro_rules! byte_dict_builder_inner_helper {
-    ($kt:ty, $capacity:ident, $builder:ident) => {
-        Box::new($builder::<$kt>::with_capacity(
-            $capacity,
-            $capacity / 100,
-            $capacity,
-        ))
-    };
+struct PartitionBuffer {
+    /// The schema of batches to be partitioned.
+    schema: SchemaRef,
+    /// The "frozen" Arrow IPC bytes of active data. They are frozen when 
`flush` is called.
+    frozen: Vec<u8>,
+    /// Array builders for appending rows into buffering batches.
+    active: Vec<Box<dyn ArrayBuilder>>,
+    /// The estimation of memory size of active builders in bytes when they 
are filled.
+    active_slots_mem_size: usize,
+    /// Number of rows in active builders.
+    num_active_rows: usize,
+    /// The maximum number of rows in a batch. Once `num_active_rows` reaches 
`batch_size`,
+    /// the active array builders will be frozen and appended to frozen buffer 
`frozen`.
+    batch_size: usize,
+    /// Memory reservation for this partition buffer.
+    reservation: MemoryReservation,
+    /// Writer that performs encoding and compression
+    shuffle_block_writer: ShuffleBlockWriter,
 }
 
-/// Returns a dictionary array builder with capacity `capacity` that 
corresponds to the datatype
-/// `DataType` This function is useful to construct arrays from an arbitrary 
vectors with
-/// known/expected schema.
-/// TODO: move this to the upstream.
-fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box<dyn 
ArrayBuilder> {
-    match datatype {
-        DataType::Dictionary(key_type, value_type) if 
value_type.is_primitive() => {
-            match key_type.as_ref() {
-                DataType::Int8 => primitive_dict_builder_helper!(Int8Type, 
value_type, capacity),
-                DataType::Int16 => primitive_dict_builder_helper!(Int16Type, 
value_type, capacity),
-                DataType::Int32 => primitive_dict_builder_helper!(Int32Type, 
value_type, capacity),
-                DataType::Int64 => primitive_dict_builder_helper!(Int64Type, 
value_type, capacity),
-                DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, 
value_type, capacity),
-                DataType::UInt16 => {
-                    primitive_dict_builder_helper!(UInt16Type, value_type, 
capacity)
-                }
-                DataType::UInt32 => {
-                    primitive_dict_builder_helper!(UInt32Type, value_type, 
capacity)
-                }
-                DataType::UInt64 => {
-                    primitive_dict_builder_helper!(UInt64Type, value_type, 
capacity)
-                }
-                _ => unreachable!(""),
-            }
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::Utf8) =>
-        {
-            match key_type.as_ref() {
-                DataType::Int8 => {
-                    byte_dict_builder_inner_helper!(Int8Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::Int16 => {
-                    byte_dict_builder_inner_helper!(Int16Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::Int32 => {
-                    byte_dict_builder_inner_helper!(Int32Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::Int64 => {
-                    byte_dict_builder_inner_helper!(Int64Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::UInt8 => {
-                    byte_dict_builder_inner_helper!(UInt8Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::UInt16 => {
-                    byte_dict_builder_inner_helper!(UInt16Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::UInt32 => {
-                    byte_dict_builder_inner_helper!(UInt32Type, capacity, 
StringDictionaryBuilder)
-                }
-                DataType::UInt64 => {
-                    byte_dict_builder_inner_helper!(UInt64Type, capacity, 
StringDictionaryBuilder)
-                }
-                _ => unreachable!(""),
-            }
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::LargeUtf8) =>
-        {
-            match key_type.as_ref() {
-                DataType::Int8 => byte_dict_builder_inner_helper!(
-                    Int8Type,
-                    capacity,
-                    LargeStringDictionaryBuilder
-                ),
-                DataType::Int16 => byte_dict_builder_inner_helper!(
-                    Int16Type,
-                    capacity,
-                    LargeStringDictionaryBuilder
-                ),
-                DataType::Int32 => byte_dict_builder_inner_helper!(
-                    Int32Type,
-                    capacity,
-                    LargeStringDictionaryBuilder
-                ),
-                DataType::Int64 => byte_dict_builder_inner_helper!(
-                    Int64Type,
-                    capacity,
-                    LargeStringDictionaryBuilder
-                ),
-                DataType::UInt8 => byte_dict_builder_inner_helper!(
-                    UInt8Type,
-                    capacity,
-                    LargeStringDictionaryBuilder
-                ),
-                DataType::UInt16 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt16Type,
-                        capacity,
-                        LargeStringDictionaryBuilder
-                    )
-                }
-                DataType::UInt32 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt32Type,
-                        capacity,
-                        LargeStringDictionaryBuilder
-                    )
-                }
-                DataType::UInt64 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt64Type,
-                        capacity,
-                        LargeStringDictionaryBuilder
-                    )
-                }
-                _ => unreachable!(""),
-            }
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::Binary) =>
-        {
-            match key_type.as_ref() {
-                DataType::Int8 => {
-                    byte_dict_builder_inner_helper!(Int8Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::Int16 => {
-                    byte_dict_builder_inner_helper!(Int16Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::Int32 => {
-                    byte_dict_builder_inner_helper!(Int32Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::Int64 => {
-                    byte_dict_builder_inner_helper!(Int64Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::UInt8 => {
-                    byte_dict_builder_inner_helper!(UInt8Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::UInt16 => {
-                    byte_dict_builder_inner_helper!(UInt16Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::UInt32 => {
-                    byte_dict_builder_inner_helper!(UInt32Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                DataType::UInt64 => {
-                    byte_dict_builder_inner_helper!(UInt64Type, capacity, 
BinaryDictionaryBuilder)
-                }
-                _ => unreachable!(""),
-            }
-        }
-        DataType::Dictionary(key_type, value_type)
-            if matches!(value_type.as_ref(), DataType::LargeBinary) =>
-        {
-            match key_type.as_ref() {
-                DataType::Int8 => byte_dict_builder_inner_helper!(
-                    Int8Type,
-                    capacity,
-                    LargeBinaryDictionaryBuilder
-                ),
-                DataType::Int16 => byte_dict_builder_inner_helper!(
-                    Int16Type,
-                    capacity,
-                    LargeBinaryDictionaryBuilder
-                ),
-                DataType::Int32 => byte_dict_builder_inner_helper!(
-                    Int32Type,
-                    capacity,
-                    LargeBinaryDictionaryBuilder
-                ),
-                DataType::Int64 => byte_dict_builder_inner_helper!(
-                    Int64Type,
-                    capacity,
-                    LargeBinaryDictionaryBuilder
-                ),
-                DataType::UInt8 => byte_dict_builder_inner_helper!(
-                    UInt8Type,
-                    capacity,
-                    LargeBinaryDictionaryBuilder
-                ),
-                DataType::UInt16 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt16Type,
-                        capacity,
-                        LargeBinaryDictionaryBuilder
-                    )
-                }
-                DataType::UInt32 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt32Type,
-                        capacity,
-                        LargeBinaryDictionaryBuilder
-                    )
-                }
-                DataType::UInt64 => {
-                    byte_dict_builder_inner_helper!(
-                        UInt64Type,
-                        capacity,
-                        LargeBinaryDictionaryBuilder
-                    )
-                }
-                _ => unreachable!(""),
-            }
-        }
-        t => panic!("Data type {t:?} is not currently supported"),
+impl PartitionBuffer {
+    fn try_new(
+        schema: SchemaRef,
+        batch_size: usize,
+        partition_id: usize,
+        runtime: &Arc<RuntimeEnv>,
+        codec: CompressionCodec,
+        enable_fast_encoding: bool,
+    ) -> Result<Self> {
+        let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", 
partition_id))
+            .with_can_spill(true)
+            .register(&runtime.memory_pool);
+        let shuffle_block_writer =
+            ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, 
codec)?;
+        Ok(Self {
+            schema,
+            frozen: vec![],
+            active: vec![],
+            active_slots_mem_size: 0,
+            num_active_rows: 0,
+            batch_size,
+            reservation,
+            shuffle_block_writer,
+        })
     }
-}
-
-fn make_batch(
-    schema: SchemaRef,
-    mut arrays: Vec<Box<dyn ArrayBuilder>>,
-    row_count: usize,
-) -> ArrowResult<RecordBatch> {
-    let columns = arrays.iter_mut().map(|array| array.finish()).collect();
-    let options = 
RecordBatchOptions::new().with_row_count(Option::from(row_count));
-    RecordBatch::try_new_with_options(schema, columns, &options)
-}
 
-/// Checksum algorithms for writing IPC bytes.
-#[derive(Clone)]
-pub(crate) enum Checksum {
-    /// CRC32 checksum algorithm.
-    CRC32(Hasher),
-    /// Adler32 checksum algorithm.
-    Adler32(Adler32),
-}
+    /// Initializes active builders if necessary.
+    /// Returns error if memory reservation fails.
+    fn init_active_if_necessary(&mut self, metrics: 
&ShuffleRepartitionerMetrics) -> Result<isize> {
+        let mut mem_diff = 0;
 
-impl Checksum {
-    pub(crate) fn try_new(algo: i32, initial_opt: Option<u32>) -> 
CometResult<Self> {
-        match algo {
-            0 => {
-                let hasher = if let Some(initial) = initial_opt {
-                    Hasher::new_with_initial(initial)
-                } else {
-                    Hasher::new()
-                };
-                Ok(Checksum::CRC32(hasher))
-            }
-            1 => {
-                let hasher = if let Some(initial) = initial_opt {
-                    // Note that Adler32 initial state is not zero.
-                    // i.e., `Adler32::from_checksum(0)` is not the same as 
`Adler32::new()`.
-                    Adler32::from_checksum(initial)
-                } else {
-                    Adler32::new()
-                };
-                Ok(Checksum::Adler32(hasher))
+        if self.active.is_empty() {
+            // Estimate the memory size of active builders
+            if self.active_slots_mem_size == 0 {
+                self.active_slots_mem_size = self
+                    .schema
+                    .fields()
+                    .iter()
+                    .map(|field| slot_size(self.batch_size, field.data_type()))
+                    .sum::<usize>();
             }
-            _ => Err(CometError::Internal(
-                "Unsupported checksum algorithm".to_string(),
-            )),
-        }
-    }
 
-    pub(crate) fn update(&mut self, cursor: &mut Cursor<&mut Vec<u8>>) -> 
CometResult<()> {
-        match self {
-            Checksum::CRC32(hasher) => {
-                std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
-                hasher.update(cursor.chunk());
-                Ok(())
-            }
-            Checksum::Adler32(hasher) => {
-                std::io::Seek::seek(cursor, SeekFrom::Start(0))?;
-                hasher.write(cursor.chunk());
-                Ok(())
-            }
-        }
-    }
+            let mut mempool_timer = metrics.mempool_time.timer();
+            self.reservation.try_grow(self.active_slots_mem_size)?;
+            mempool_timer.stop();
+
+            let mut repart_timer = metrics.repart_time.timer();
+            self.active = new_array_builders(&self.schema, self.batch_size);
+            repart_timer.stop();
 
-    pub(crate) fn finalize(self) -> u32 {
-        match self {
-            Checksum::CRC32(hasher) => hasher.finalize(),
-            Checksum::Adler32(hasher) => hasher.finish(),
+            mem_diff += self.active_slots_mem_size as isize;
         }
+        Ok(mem_diff)
     }
-}
-
-#[derive(Debug, Clone)]
-pub enum CompressionCodec {
-    None,
-    Lz4Frame,
-    Zstd(i32),
-    Snappy,
-}
 
-pub struct ShuffleBlockWriter {
-    fast_encoding: bool,
-    codec: CompressionCodec,
-    encoded_schema: Vec<u8>,
-    header_bytes: Vec<u8>,
-}
+    /// Appends rows of specified indices from columns into active array 
builders.
+    fn append_rows(
+        &mut self,
+        columns: &[ArrayRef],
+        indices: &[usize],
+        start_index: usize,
+        metrics: &ShuffleRepartitionerMetrics,
+    ) -> AppendRowStatus {
+        let mut mem_diff = 0;
+        let mut start = start_index;
 
-impl ShuffleBlockWriter {
-    pub fn try_new(
-        schema: &Schema,
-        enable_fast_encoding: bool,
-        codec: CompressionCodec,
-    ) -> Result<Self> {
-        let mut encoded_schema = vec![];
-
-        let enable_fast_encoding = enable_fast_encoding
-            && schema
-                .fields()
-                .iter()
-                .all(|f| fast_codec_supports_type(f.data_type()));
-
-        // encode the schema once and then reuse the encoded bytes for each 
batch
-        if enable_fast_encoding {
-            let mut w = BatchWriter::new(&mut encoded_schema);
-            w.write_partial_schema(schema)?;
+        // lazy init because some partition may be empty
+        let init = self.init_active_if_necessary(metrics);
+        if init.is_err() {
+            return AppendRowStatus::StartIndex(start);
         }
+        mem_diff += init.unwrap();
 
-        let header_bytes = Vec::with_capacity(24);
-        let mut cursor = Cursor::new(header_bytes);
-
-        // leave space for compressed message length
-        cursor.seek_relative(8)?;
+        while start < indices.len() {
+            let end = (start + self.batch_size).min(indices.len());
 
-        // write number of columns because JVM side needs to know how many 
addresses to allocate
-        let field_count = schema.fields().len();
-        cursor.write_all(&field_count.to_le_bytes())?;
+            let mut repart_timer = metrics.repart_time.timer();
+            self.active
+                .iter_mut()
+                .zip(columns)
+                .for_each(|(builder, column)| {
+                    append_columns(builder, column, &indices[start..end], 
column.data_type());
+                });
+            self.num_active_rows += end - start;
+            repart_timer.stop();
 
-        // write compression codec to header
-        let codec_header = match &codec {
-            CompressionCodec::Snappy => b"SNAP",
-            CompressionCodec::Lz4Frame => b"LZ4_",
-            CompressionCodec::Zstd(_) => b"ZSTD",
-            CompressionCodec::None => b"NONE",
-        };
-        cursor.write_all(codec_header)?;
+            if self.num_active_rows >= self.batch_size {
+                let flush = self.flush(metrics);
+                if let Err(e) = flush {
+                    return AppendRowStatus::MemDiff(Err(e));
+                }
+                mem_diff += flush.unwrap();
 
-        // write encoding scheme
-        if enable_fast_encoding {
-            cursor.write_all(b"FAST")?;
-        } else {
-            cursor.write_all(b"AIPC")?;
+                let init = self.init_active_if_necessary(metrics);
+                if init.is_err() {
+                    return AppendRowStatus::StartIndex(end);
+                }
+                mem_diff += init.unwrap();
+            }
+            start = end;
         }
-
-        let header_bytes = cursor.into_inner();
-
-        Ok(Self {
-            fast_encoding: enable_fast_encoding,
-            codec,
-            encoded_schema,
-            header_bytes,
-        })
+        AppendRowStatus::MemDiff(Ok(mem_diff))
     }
 
-    /// Writes given record batch as Arrow IPC bytes into given writer.
-    /// Returns number of bytes written.
-    pub fn write_batch<W: Write + Seek>(
-        &self,
-        batch: &RecordBatch,
-        output: &mut W,
-        ipc_time: &Time,
-    ) -> Result<usize> {
-        if batch.num_rows() == 0 {
+    /// flush active data into frozen bytes
+    fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> 
Result<isize> {
+        if self.num_active_rows == 0 {
             return Ok(0);
         }
+        let mut mem_diff = 0isize;
 
-        let mut timer = ipc_time.timer();
-        let start_pos = output.stream_position()?;
-
-        // write header
-        output.write_all(&self.header_bytes)?;
+        // active -> staging
+        let active = std::mem::take(&mut self.active);
+        let num_rows = self.num_active_rows;
+        self.num_active_rows = 0;
 
-        let output = if self.fast_encoding {
-            match &self.codec {
-                CompressionCodec::None => {
-                    let mut fast_writer = BatchWriter::new(&mut *output);
-                    fast_writer.write_all(&self.encoded_schema)?;
-                    fast_writer.write_batch(batch)?;
-                    output
-                }
-                CompressionCodec::Lz4Frame => {
-                    let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
-                    let mut fast_writer = BatchWriter::new(&mut wtr);
-                    fast_writer.write_all(&self.encoded_schema)?;
-                    fast_writer.write_batch(batch)?;
-                    wtr.finish().map_err(|e| {
-                        DataFusionError::Execution(format!("lz4 compression 
error: {}", e))
-                    })?
-                }
-                CompressionCodec::Zstd(level) => {
-                    let mut encoder = zstd::Encoder::new(output, *level)?;
-                    let mut fast_writer = BatchWriter::new(&mut encoder);
-                    fast_writer.write_all(&self.encoded_schema)?;
-                    fast_writer.write_batch(batch)?;
-                    encoder.finish()?
-                }
-                CompressionCodec::Snappy => {
-                    let mut encoder = snap::write::FrameEncoder::new(output);
-                    let mut fast_writer = BatchWriter::new(&mut encoder);
-                    fast_writer.write_all(&self.encoded_schema)?;
-                    fast_writer.write_batch(batch)?;
-                    encoder.into_inner().map_err(|e| {
-                        DataFusionError::Execution(format!("snappy compression 
error: {}", e))
-                    })?
-                }
-            }
-        } else {
-            match &self.codec {
-                CompressionCodec::None => {
-                    let mut arrow_writer = StreamWriter::try_new(output, 
&batch.schema())?;
-                    arrow_writer.write(batch)?;
-                    arrow_writer.finish()?;
-                    arrow_writer.into_inner()?
-                }
-                CompressionCodec::Lz4Frame => {
-                    let mut wtr = lz4_flex::frame::FrameEncoder::new(output);
-                    let mut arrow_writer = StreamWriter::try_new(&mut wtr, 
&batch.schema())?;
-                    arrow_writer.write(batch)?;
-                    arrow_writer.finish()?;
-                    wtr.finish().map_err(|e| {
-                        DataFusionError::Execution(format!("lz4 compression 
error: {}", e))
-                    })?
-                }
+        let mut mempool_timer = metrics.mempool_time.timer();
+        self.reservation.try_shrink(self.active_slots_mem_size)?;
+        mempool_timer.stop();
 
-                CompressionCodec::Zstd(level) => {
-                    let encoder = zstd::Encoder::new(output, *level)?;
-                    let mut arrow_writer = StreamWriter::try_new(encoder, 
&batch.schema())?;
-                    arrow_writer.write(batch)?;
-                    arrow_writer.finish()?;
-                    let zstd_encoder = arrow_writer.into_inner()?;
-                    zstd_encoder.finish()?
-                }
+        let mut repart_timer = metrics.repart_time.timer();
+        let frozen_batch = make_batch(Arc::clone(&self.schema), active, 
num_rows)?;
+        repart_timer.stop();
 
-                CompressionCodec::Snappy => {
-                    let mut wtr = snap::write::FrameEncoder::new(output);
-                    let mut arrow_writer = StreamWriter::try_new(&mut wtr, 
&batch.schema())?;
-                    arrow_writer.write(batch)?;
-                    arrow_writer.finish()?;
-                    wtr.into_inner().map_err(|e| {
-                        DataFusionError::Execution(format!("snappy compression 
error: {}", e))
-                    })?
-                }
-            }
-        };
+        let frozen_capacity_old = self.frozen.capacity();
+        let mut cursor = Cursor::new(&mut self.frozen);
+        cursor.seek(SeekFrom::End(0))?;
+        self.shuffle_block_writer
+            .write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?;
 
-        // fill ipc length
-        let end_pos = output.stream_position()?;
-        let ipc_length = end_pos - start_pos - 8;
-        let max_size = i32::MAX as u64;
-        if ipc_length > max_size {
-            return Err(DataFusionError::Execution(format!(
-                "Shuffle block size {ipc_length} exceeds maximum size of 
{max_size}. \
-                Try reducing batch size or increasing compression level"
-            )));
-        }
+        mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
+        Ok(mem_diff)
+    }
+}
 
-        // fill ipc length
-        output.seek(SeekFrom::Start(start_pos))?;
-        output.write_all(&ipc_length.to_le_bytes())?;
-        output.seek(SeekFrom::Start(end_pos))?;
+struct SpillInfo {
+    file: RefCountedTempFile,
+    offsets: Vec<u64>,
+}
 
-        timer.stop();
+/// consume the `buffered_partitions` and do spill into a single temp shuffle 
output file
+fn spill_into(
+    buffered_partitions: &mut [PartitionBuffer],
+    path: &Path,
+    num_output_partitions: usize,
+    metrics: &ShuffleRepartitionerMetrics,
+) -> Result<Vec<u64>> {
+    let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions];
 
-        Ok((end_pos - start_pos) as usize)
+    for i in 0..num_output_partitions {
+        buffered_partitions[i].flush(metrics)?;
+        output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen);
     }
-}
+    let path = path.to_owned();
 
-pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> {
-    let fast_encoding = match &bytes[4..8] {
-        b"AIPC" => false,
-        b"FAST" => true,
-        other => {
-            return Err(DataFusionError::Internal(format!(
-                "invalid encoding schema: {other:?}"
-            )))
-        }
-    };
-    match &bytes[0..4] {
-        b"SNAP" => {
-            let mut decoder = snap::read::FrameDecoder::new(&bytes[8..]);
-            if fast_encoding {
-                // TODO avoid reading bytes into interim buffer
-                let mut buffer = vec![];
-                decoder.read_to_end(&mut buffer)?;
-                let mut reader = BatchReader::new(&buffer);
-                reader.read_batch()
-            } else {
-                let mut reader = StreamReader::try_new(decoder, None)?;
-                reader.next().unwrap().map_err(|e| e.into())
-            }
-        }
-        b"LZ4_" => {
-            let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[8..]);
-            if fast_encoding {
-                // TODO avoid reading bytes into interim buffer
-                let mut buffer = vec![];
-                decoder.read_to_end(&mut buffer)?;
-                let mut reader = BatchReader::new(&buffer);
-                reader.read_batch()
-            } else {
-                let mut reader = StreamReader::try_new(decoder, None)?;
-                reader.next().unwrap().map_err(|e| e.into())
-            }
-        }
-        b"ZSTD" => {
-            let mut decoder = zstd::Decoder::new(&bytes[8..])?;
-            if fast_encoding {
-                // TODO avoid reading bytes into interim buffer
-                let mut buffer = vec![];
-                decoder.read_to_end(&mut buffer)?;
-                let mut reader = BatchReader::new(&buffer);
-                reader.read_batch()
-            } else {
-                let mut reader = StreamReader::try_new(decoder, None)?;
-                reader.next().unwrap().map_err(|e| e.into())
-            }
-        }
-        b"NONE" => {
-            if fast_encoding {
-                let mut reader = BatchReader::new(&bytes[8..]);
-                reader.read_batch()
-            } else {
-                let mut reader = StreamReader::try_new(&bytes[8..], None)?;
-                reader.next().unwrap().map_err(|e| e.into())
-            }
-        }
-        other => Err(DataFusionError::Execution(format!(
-            "Failed to decode batch: invalid compression codec: {other:?}"
-        ))),
+    let mut write_timer = metrics.write_time.timer();
+
+    let mut offsets = vec![0; num_output_partitions + 1];
+    let mut spill_data = OpenOptions::new()
+        .write(true)
+        .create(true)
+        .truncate(true)
+        .open(path)
+        .map_err(|e| DataFusionError::Execution(format!("Error occurred while 
spilling {}", e)))?;
+
+    for i in 0..num_output_partitions {
+        offsets[i] = spill_data.stream_position()?;
+        spill_data.write_all(&output_batches[i])?;
+        output_batches[i].clear();
     }
+    write_timer.stop();
+
+    // add one extra offset at last to ease partition length computation
+    offsets[num_output_partitions] = spill_data.stream_position()?;
+    Ok(offsets)
 }
 
 /// A stream that yields no record batches which represent end of output.
@@ -1867,6 +979,8 @@ fn pmod(hash: u32, n: usize) -> usize {
 #[cfg(test)]
 mod test {
     use super::*;
+    use crate::execution::shuffle::read_ipc_compressed;
+    use arrow_schema::{DataType, Field, Schema};
     use datafusion::physical_plan::common::collect;
     use datafusion::physical_plan::memory::MemoryExec;
     use datafusion::prelude::SessionContext;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to