This is an automated email from the ASF dual-hosted git repository. agrove pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push: new 58a36b620 chore: Re-organize shuffle writer code (#1439) 58a36b620 is described below commit 58a36b62004c1e300d7f3113d9a64713a94c0db9 Author: Andy Grove <agr...@apache.org> AuthorDate: Tue Feb 25 17:14:18 2025 -0700 chore: Re-organize shuffle writer code (#1439) --- native/core/src/execution/shuffle/builders.rs | 599 ++++++++ native/core/src/execution/shuffle/codec.rs | 318 +++- native/core/src/execution/shuffle/mod.rs | 9 +- native/core/src/execution/shuffle/row.rs | 4 +- .../core/src/execution/shuffle/shuffle_writer.rs | 1544 +++++--------------- 5 files changed, 1252 insertions(+), 1222 deletions(-) diff --git a/native/core/src/execution/shuffle/builders.rs b/native/core/src/execution/shuffle/builders.rs new file mode 100644 index 000000000..184467b5d --- /dev/null +++ b/native/core/src/execution/shuffle/builders.rs @@ -0,0 +1,599 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_array::builder::{make_builder, ArrayBuilder}; +use std::sync::Arc; + +use crate::common::bit::ceil; +use arrow::datatypes::*; +use datafusion::arrow::{ + array::*, + datatypes::{DataType, SchemaRef, TimeUnit}, + error::Result as ArrowResult, + record_batch::RecordBatch, +}; + +pub(crate) fn new_array_builders( + schema: &SchemaRef, + batch_size: usize, +) -> Vec<Box<dyn ArrayBuilder>> { + schema + .fields() + .iter() + .map(|field| { + let dt = field.data_type(); + if matches!(dt, DataType::Dictionary(_, _)) { + make_dict_builder(dt, batch_size) + } else { + make_builder(dt, batch_size) + } + }) + .collect::<Vec<_>>() +} + +macro_rules! primitive_dict_builder_inner_helper { + ($kt:ty, $vt:ty, $capacity:ident) => { + Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity( + $capacity, + $capacity / 100, + )) + }; +} + +macro_rules! primitive_dict_builder_helper { + ($kt:ty, $vt:ident, $capacity:ident) => { + match $vt.as_ref() { + DataType::Int8 => { + primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity) + } + DataType::Int16 => { + primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity) + } + DataType::Int32 => { + primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity) + } + DataType::Int64 => { + primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity) + } + DataType::UInt8 => { + primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity) + } + DataType::UInt16 => { + primitive_dict_builder_inner_helper!($kt, UInt16Type, $capacity) + } + DataType::UInt32 => { + primitive_dict_builder_inner_helper!($kt, UInt32Type, $capacity) + } + DataType::UInt64 => { + primitive_dict_builder_inner_helper!($kt, UInt64Type, $capacity) + } + DataType::Float32 => { + primitive_dict_builder_inner_helper!($kt, Float32Type, $capacity) + } + DataType::Float64 => { + primitive_dict_builder_inner_helper!($kt, Float64Type, $capacity) + } + DataType::Decimal128(p, s) => { + let keys_builder = PrimitiveBuilder::<$kt>::new(); + let values_builder = + Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s)); + Box::new( + PrimitiveDictionaryBuilder::<$kt, Decimal128Type>::new_from_empty_builders( + keys_builder, + values_builder, + ), + ) + } + DataType::Timestamp(TimeUnit::Microsecond, timezone) => { + let keys_builder = PrimitiveBuilder::<$kt>::new(); + let values_builder = TimestampMicrosecondBuilder::new() + .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, timezone.clone())); + Box::new( + PrimitiveDictionaryBuilder::<$kt, TimestampMicrosecondType>::new_from_empty_builders( + keys_builder, + values_builder, + ), + ) + } + DataType::Date32 => { + primitive_dict_builder_inner_helper!($kt, Date32Type, $capacity) + } + DataType::Date64 => { + primitive_dict_builder_inner_helper!($kt, Date64Type, $capacity) + } + t => unimplemented!("{:?} is not supported", t), + } + }; +} + +macro_rules! byte_dict_builder_inner_helper { + ($kt:ty, $capacity:ident, $builder:ident) => { + Box::new($builder::<$kt>::with_capacity( + $capacity, + $capacity / 100, + $capacity, + )) + }; +} + +/// Returns a dictionary array builder with capacity `capacity` that corresponds to the datatype +/// `DataType` This function is useful to construct arrays from an arbitrary vectors with +/// known/expected schema. +/// TODO: move this to the upstream. +fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box<dyn ArrayBuilder> { + match datatype { + DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { + match key_type.as_ref() { + DataType::Int8 => primitive_dict_builder_helper!(Int8Type, value_type, capacity), + DataType::Int16 => primitive_dict_builder_helper!(Int16Type, value_type, capacity), + DataType::Int32 => primitive_dict_builder_helper!(Int32Type, value_type, capacity), + DataType::Int64 => primitive_dict_builder_helper!(Int64Type, value_type, capacity), + DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, value_type, capacity), + DataType::UInt16 => { + primitive_dict_builder_helper!(UInt16Type, value_type, capacity) + } + DataType::UInt32 => { + primitive_dict_builder_helper!(UInt32Type, value_type, capacity) + } + DataType::UInt64 => { + primitive_dict_builder_helper!(UInt64Type, value_type, capacity) + } + _ => unreachable!(""), + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Utf8) => + { + match key_type.as_ref() { + DataType::Int8 => { + byte_dict_builder_inner_helper!(Int8Type, capacity, StringDictionaryBuilder) + } + DataType::Int16 => { + byte_dict_builder_inner_helper!(Int16Type, capacity, StringDictionaryBuilder) + } + DataType::Int32 => { + byte_dict_builder_inner_helper!(Int32Type, capacity, StringDictionaryBuilder) + } + DataType::Int64 => { + byte_dict_builder_inner_helper!(Int64Type, capacity, StringDictionaryBuilder) + } + DataType::UInt8 => { + byte_dict_builder_inner_helper!(UInt8Type, capacity, StringDictionaryBuilder) + } + DataType::UInt16 => { + byte_dict_builder_inner_helper!(UInt16Type, capacity, StringDictionaryBuilder) + } + DataType::UInt32 => { + byte_dict_builder_inner_helper!(UInt32Type, capacity, StringDictionaryBuilder) + } + DataType::UInt64 => { + byte_dict_builder_inner_helper!(UInt64Type, capacity, StringDictionaryBuilder) + } + _ => unreachable!(""), + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::LargeUtf8) => + { + match key_type.as_ref() { + DataType::Int8 => byte_dict_builder_inner_helper!( + Int8Type, + capacity, + LargeStringDictionaryBuilder + ), + DataType::Int16 => byte_dict_builder_inner_helper!( + Int16Type, + capacity, + LargeStringDictionaryBuilder + ), + DataType::Int32 => byte_dict_builder_inner_helper!( + Int32Type, + capacity, + LargeStringDictionaryBuilder + ), + DataType::Int64 => byte_dict_builder_inner_helper!( + Int64Type, + capacity, + LargeStringDictionaryBuilder + ), + DataType::UInt8 => byte_dict_builder_inner_helper!( + UInt8Type, + capacity, + LargeStringDictionaryBuilder + ), + DataType::UInt16 => { + byte_dict_builder_inner_helper!( + UInt16Type, + capacity, + LargeStringDictionaryBuilder + ) + } + DataType::UInt32 => { + byte_dict_builder_inner_helper!( + UInt32Type, + capacity, + LargeStringDictionaryBuilder + ) + } + DataType::UInt64 => { + byte_dict_builder_inner_helper!( + UInt64Type, + capacity, + LargeStringDictionaryBuilder + ) + } + _ => unreachable!(""), + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Binary) => + { + match key_type.as_ref() { + DataType::Int8 => { + byte_dict_builder_inner_helper!(Int8Type, capacity, BinaryDictionaryBuilder) + } + DataType::Int16 => { + byte_dict_builder_inner_helper!(Int16Type, capacity, BinaryDictionaryBuilder) + } + DataType::Int32 => { + byte_dict_builder_inner_helper!(Int32Type, capacity, BinaryDictionaryBuilder) + } + DataType::Int64 => { + byte_dict_builder_inner_helper!(Int64Type, capacity, BinaryDictionaryBuilder) + } + DataType::UInt8 => { + byte_dict_builder_inner_helper!(UInt8Type, capacity, BinaryDictionaryBuilder) + } + DataType::UInt16 => { + byte_dict_builder_inner_helper!(UInt16Type, capacity, BinaryDictionaryBuilder) + } + DataType::UInt32 => { + byte_dict_builder_inner_helper!(UInt32Type, capacity, BinaryDictionaryBuilder) + } + DataType::UInt64 => { + byte_dict_builder_inner_helper!(UInt64Type, capacity, BinaryDictionaryBuilder) + } + _ => unreachable!(""), + } + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::LargeBinary) => + { + match key_type.as_ref() { + DataType::Int8 => byte_dict_builder_inner_helper!( + Int8Type, + capacity, + LargeBinaryDictionaryBuilder + ), + DataType::Int16 => byte_dict_builder_inner_helper!( + Int16Type, + capacity, + LargeBinaryDictionaryBuilder + ), + DataType::Int32 => byte_dict_builder_inner_helper!( + Int32Type, + capacity, + LargeBinaryDictionaryBuilder + ), + DataType::Int64 => byte_dict_builder_inner_helper!( + Int64Type, + capacity, + LargeBinaryDictionaryBuilder + ), + DataType::UInt8 => byte_dict_builder_inner_helper!( + UInt8Type, + capacity, + LargeBinaryDictionaryBuilder + ), + DataType::UInt16 => { + byte_dict_builder_inner_helper!( + UInt16Type, + capacity, + LargeBinaryDictionaryBuilder + ) + } + DataType::UInt32 => { + byte_dict_builder_inner_helper!( + UInt32Type, + capacity, + LargeBinaryDictionaryBuilder + ) + } + DataType::UInt64 => { + byte_dict_builder_inner_helper!( + UInt64Type, + capacity, + LargeBinaryDictionaryBuilder + ) + } + _ => unreachable!(""), + } + } + t => panic!("Data type {t:?} is not currently supported"), + } +} + +pub(crate) fn slot_size(len: usize, data_type: &DataType) -> usize { + match data_type { + DataType::Boolean => ceil(len, 8), + DataType::Int8 => len, + DataType::Int16 => len * 2, + DataType::Int32 => len * 4, + DataType::Int64 => len * 8, + DataType::UInt8 => len, + DataType::UInt16 => len * 2, + DataType::UInt32 => len * 4, + DataType::UInt64 => len * 8, + DataType::Float32 => len * 4, + DataType::Float64 => len * 8, + DataType::Date32 => len * 4, + DataType::Date64 => len * 8, + DataType::Time32(TimeUnit::Second) => len * 4, + DataType::Time32(TimeUnit::Millisecond) => len * 4, + DataType::Time64(TimeUnit::Microsecond) => len * 8, + DataType::Time64(TimeUnit::Nanosecond) => len * 8, + // TODO: this is not accurate, but should be good enough for now + DataType::Utf8 => len * 100 + len * 4, + DataType::LargeUtf8 => len * 100 + len * 8, + DataType::Decimal128(_, _) => len * 16, + DataType::Dictionary(key_type, value_type) => { + // TODO: this is not accurate, but should be good enough for now + slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) + } + // TODO: this is not accurate, but should be good enough for now + DataType::Binary => len * 100 + len * 4, + DataType::LargeBinary => len * 100 + len * 8, + DataType::FixedSizeBinary(s) => len * (*s as usize), + DataType::Timestamp(_, _) => len * 8, + dt => unimplemented!( + "{}", + format!("data type {dt} not supported in shuffle write") + ), + } +} + +pub(crate) fn append_columns( + to: &mut Box<dyn ArrayBuilder>, + from: &Arc<dyn Array>, + indices: &[usize], + data_type: &DataType, +) { + /// Append values from `from` to `to` using `indices`. + macro_rules! append { + ($arrowty:ident) => {{ + type B = paste::paste! {[< $arrowty Builder >]}; + type A = paste::paste! {[< $arrowty Array >]}; + let t = to.as_any_mut().downcast_mut::<B>().unwrap(); + let f = from.as_any().downcast_ref::<A>().unwrap(); + for &i in indices { + if f.is_valid(i) { + t.append_value(f.value(i)); + } else { + t.append_null(); + } + } + }}; + } + + /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method returning + /// a `Result`. + macro_rules! append_unwrap { + ($arrowty:ident) => {{ + type B = paste::paste! {[< $arrowty Builder >]}; + type A = paste::paste! {[< $arrowty Array >]}; + let t = to.as_any_mut().downcast_mut::<B>().unwrap(); + let f = from.as_any().downcast_ref::<A>().unwrap(); + for &i in indices { + if f.is_valid(i) { + t.append_value(f.value(i)).unwrap(); + } else { + t.append_null(); + } + } + }}; + } + + /// Appends values from a dictionary array to a dictionary builder. + macro_rules! append_dict { + ($kt:ty, $builder:ty, $dict_array:ty) => {{ + let t = to.as_any_mut().downcast_mut::<$builder>().unwrap(); + let f = from + .as_any() + .downcast_ref::<DictionaryArray<$kt>>() + .unwrap() + .downcast_dict::<$dict_array>() + .unwrap(); + for &i in indices { + if f.is_valid(i) { + t.append_value(f.value(i)); + } else { + t.append_null(); + } + } + }}; + } + + macro_rules! append_dict_helper { + ($kt:ident, $ty:ty, $dict_array:ty) => {{ + match $kt.as_ref() { + DataType::Int8 => append_dict!(Int8Type, PrimitiveDictionaryBuilder<Int8Type, $ty>, $dict_array), + DataType::Int16 => append_dict!(Int16Type, PrimitiveDictionaryBuilder<Int16Type, $ty>, $dict_array), + DataType::Int32 => append_dict!(Int32Type, PrimitiveDictionaryBuilder<Int32Type, $ty>, $dict_array), + DataType::Int64 => append_dict!(Int64Type, PrimitiveDictionaryBuilder<Int64Type, $ty>, $dict_array), + DataType::UInt8 => append_dict!(UInt8Type, PrimitiveDictionaryBuilder<UInt8Type, $ty>, $dict_array), + DataType::UInt16 => { + append_dict!(UInt16Type, PrimitiveDictionaryBuilder<UInt16Type, $ty>, $dict_array) + } + DataType::UInt32 => { + append_dict!(UInt32Type, PrimitiveDictionaryBuilder<UInt32Type, $ty>, $dict_array) + } + DataType::UInt64 => { + append_dict!(UInt64Type, PrimitiveDictionaryBuilder<UInt64Type, $ty>, $dict_array) + } + _ => unreachable!("Unknown key type for dictionary"), + } + }}; + } + + macro_rules! primitive_append_dict_helper { + ($kt:ident, $vt:ident) => { + match $vt.as_ref() { + DataType::Int8 => { + append_dict_helper!($kt, Int8Type, Int8Array) + } + DataType::Int16 => { + append_dict_helper!($kt, Int16Type, Int16Array) + } + DataType::Int32 => { + append_dict_helper!($kt, Int32Type, Int32Array) + } + DataType::Int64 => { + append_dict_helper!($kt, Int64Type, Int64Array) + } + DataType::UInt8 => { + append_dict_helper!($kt, UInt8Type, UInt8Array) + } + DataType::UInt16 => { + append_dict_helper!($kt, UInt16Type, UInt16Array) + } + DataType::UInt32 => { + append_dict_helper!($kt, UInt32Type, UInt32Array) + } + DataType::UInt64 => { + append_dict_helper!($kt, UInt64Type, UInt64Array) + } + DataType::Float32 => { + append_dict_helper!($kt, Float32Type, Float32Array) + } + DataType::Float64 => { + append_dict_helper!($kt, Float64Type, Float64Array) + } + DataType::Decimal128(_, _) => { + append_dict_helper!($kt, Decimal128Type, Decimal128Array) + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + append_dict_helper!($kt, TimestampMicrosecondType, TimestampMicrosecondArray) + } + DataType::Date32 => { + append_dict_helper!($kt, Date32Type, Date32Array) + } + DataType::Date64 => { + append_dict_helper!($kt, Date64Type, Date64Array) + } + t => unimplemented!("{:?} is not supported for appending dictionary builder", t), + } + }; + } + + macro_rules! append_byte_dict { + ($kt:ident, $byte_type:ty, $array_type:ty) => {{ + match $kt.as_ref() { + DataType::Int8 => { + append_dict!(Int8Type, GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type) + } + DataType::Int16 => { + append_dict!(Int16Type, GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type) + } + DataType::Int32 => { + append_dict!(Int32Type, GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type) + } + DataType::Int64 => { + append_dict!(Int64Type, GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type) + } + DataType::UInt8 => { + append_dict!(UInt8Type, GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type) + } + DataType::UInt16 => { + append_dict!(UInt16Type, GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type) + } + DataType::UInt32 => { + append_dict!(UInt32Type, GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type) + } + DataType::UInt64 => { + append_dict!(UInt64Type, GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type) + } + _ => unreachable!("Unknown key type for dictionary"), + } + }}; + } + + match data_type { + DataType::Boolean => append!(Boolean), + DataType::Int8 => append!(Int8), + DataType::Int16 => append!(Int16), + DataType::Int32 => append!(Int32), + DataType::Int64 => append!(Int64), + DataType::UInt8 => append!(UInt8), + DataType::UInt16 => append!(UInt16), + DataType::UInt32 => append!(UInt32), + DataType::UInt64 => append!(UInt64), + DataType::Float32 => append!(Float32), + DataType::Float64 => append!(Float64), + DataType::Date32 => append!(Date32), + DataType::Date64 => append!(Date64), + DataType::Time32(TimeUnit::Second) => append!(Time32Second), + DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond), + DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond), + DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond), + DataType::Timestamp(TimeUnit::Microsecond, _) => { + append!(TimestampMicrosecond) + } + DataType::Utf8 => append!(String), + DataType::LargeUtf8 => append!(LargeString), + DataType::Decimal128(_, _) => append!(Decimal128), + DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { + primitive_append_dict_helper!(key_type, value_type) + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Utf8) => + { + append_byte_dict!(key_type, GenericStringType<i32>, StringArray) + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::LargeUtf8) => + { + append_byte_dict!(key_type, GenericStringType<i64>, LargeStringArray) + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::Binary) => + { + append_byte_dict!(key_type, GenericBinaryType<i32>, BinaryArray) + } + DataType::Dictionary(key_type, value_type) + if matches!(value_type.as_ref(), DataType::LargeBinary) => + { + append_byte_dict!(key_type, GenericBinaryType<i64>, LargeBinaryArray) + } + DataType::Binary => append!(Binary), + DataType::LargeBinary => append!(LargeBinary), + DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary), + t => unimplemented!( + "{}", + format!("data type {} not supported in shuffle write", t) + ), + } +} + +pub(crate) fn make_batch( + schema: SchemaRef, + mut arrays: Vec<Box<dyn ArrayBuilder>>, + row_count: usize, +) -> ArrowResult<RecordBatch> { + let columns = arrays.iter_mut().map(|array| array.finish()).collect(); + let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); + RecordBatch::try_new_with_options(schema, columns, &options) +} diff --git a/native/core/src/execution/shuffle/codec.rs b/native/core/src/execution/shuffle/codec.rs index 3c735434c..8d7b45431 100644 --- a/native/core/src/execution/shuffle/codec.rs +++ b/native/core/src/execution/shuffle/codec.rs @@ -15,7 +15,10 @@ // specific language governing permissions and limitations // under the License. +use crate::errors::{CometError, CometResult}; use crate::parquet::data_type::AsBytes; +use arrow::ipc::reader::StreamReader; +use arrow::ipc::writer::StreamWriter; use arrow_array::cast::AsArray; use arrow_array::types::Int32Type; use arrow_array::{ @@ -25,8 +28,13 @@ use arrow_array::{ }; use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use bytes::Buf; +use crc32fast::Hasher; +use datafusion::error::Result; +use datafusion::physical_plan::metrics::Time; use datafusion_common::DataFusionError; -use std::io::Write; +use simd_adler32::Adler32; +use std::io::{Cursor, Read, Seek, SeekFrom, Write}; use std::sync::Arc; pub fn fast_codec_supports_type(data_type: &DataType) -> bool { @@ -706,3 +714,311 @@ mod test { .unwrap() } } + +#[derive(Debug, Clone)] +pub enum CompressionCodec { + None, + Lz4Frame, + Zstd(i32), + Snappy, +} + +pub struct ShuffleBlockWriter { + fast_encoding: bool, + codec: CompressionCodec, + encoded_schema: Vec<u8>, + header_bytes: Vec<u8>, +} + +impl ShuffleBlockWriter { + pub fn try_new( + schema: &Schema, + enable_fast_encoding: bool, + codec: CompressionCodec, + ) -> Result<Self> { + let mut encoded_schema = vec![]; + + let enable_fast_encoding = enable_fast_encoding + && schema + .fields() + .iter() + .all(|f| fast_codec_supports_type(f.data_type())); + + // encode the schema once and then reuse the encoded bytes for each batch + if enable_fast_encoding { + let mut w = BatchWriter::new(&mut encoded_schema); + w.write_partial_schema(schema)?; + } + + let header_bytes = Vec::with_capacity(24); + let mut cursor = Cursor::new(header_bytes); + + // leave space for compressed message length + cursor.seek_relative(8)?; + + // write number of columns because JVM side needs to know how many addresses to allocate + let field_count = schema.fields().len(); + cursor.write_all(&field_count.to_le_bytes())?; + + // write compression codec to header + let codec_header = match &codec { + CompressionCodec::Snappy => b"SNAP", + CompressionCodec::Lz4Frame => b"LZ4_", + CompressionCodec::Zstd(_) => b"ZSTD", + CompressionCodec::None => b"NONE", + }; + cursor.write_all(codec_header)?; + + // write encoding scheme + if enable_fast_encoding { + cursor.write_all(b"FAST")?; + } else { + cursor.write_all(b"AIPC")?; + } + + let header_bytes = cursor.into_inner(); + + Ok(Self { + fast_encoding: enable_fast_encoding, + codec, + encoded_schema, + header_bytes, + }) + } + + /// Writes given record batch as Arrow IPC bytes into given writer. + /// Returns number of bytes written. + pub fn write_batch<W: Write + Seek>( + &self, + batch: &RecordBatch, + output: &mut W, + ipc_time: &Time, + ) -> Result<usize> { + if batch.num_rows() == 0 { + return Ok(0); + } + + let mut timer = ipc_time.timer(); + let start_pos = output.stream_position()?; + + // write header + output.write_all(&self.header_bytes)?; + + let output = if self.fast_encoding { + match &self.codec { + CompressionCodec::None => { + let mut fast_writer = BatchWriter::new(&mut *output); + fast_writer.write_all(&self.encoded_schema)?; + fast_writer.write_batch(batch)?; + output + } + CompressionCodec::Lz4Frame => { + let mut wtr = lz4_flex::frame::FrameEncoder::new(output); + let mut fast_writer = BatchWriter::new(&mut wtr); + fast_writer.write_all(&self.encoded_schema)?; + fast_writer.write_batch(batch)?; + wtr.finish().map_err(|e| { + DataFusionError::Execution(format!("lz4 compression error: {}", e)) + })? + } + CompressionCodec::Zstd(level) => { + let mut encoder = zstd::Encoder::new(output, *level)?; + let mut fast_writer = BatchWriter::new(&mut encoder); + fast_writer.write_all(&self.encoded_schema)?; + fast_writer.write_batch(batch)?; + encoder.finish()? + } + CompressionCodec::Snappy => { + let mut encoder = snap::write::FrameEncoder::new(output); + let mut fast_writer = BatchWriter::new(&mut encoder); + fast_writer.write_all(&self.encoded_schema)?; + fast_writer.write_batch(batch)?; + encoder.into_inner().map_err(|e| { + DataFusionError::Execution(format!("snappy compression error: {}", e)) + })? + } + } + } else { + match &self.codec { + CompressionCodec::None => { + let mut arrow_writer = StreamWriter::try_new(output, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + arrow_writer.into_inner()? + } + CompressionCodec::Lz4Frame => { + let mut wtr = lz4_flex::frame::FrameEncoder::new(output); + let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + wtr.finish().map_err(|e| { + DataFusionError::Execution(format!("lz4 compression error: {}", e)) + })? + } + + CompressionCodec::Zstd(level) => { + let encoder = zstd::Encoder::new(output, *level)?; + let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + let zstd_encoder = arrow_writer.into_inner()?; + zstd_encoder.finish()? + } + + CompressionCodec::Snappy => { + let mut wtr = snap::write::FrameEncoder::new(output); + let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; + arrow_writer.write(batch)?; + arrow_writer.finish()?; + wtr.into_inner().map_err(|e| { + DataFusionError::Execution(format!("snappy compression error: {}", e)) + })? + } + } + }; + + // fill ipc length + let end_pos = output.stream_position()?; + let ipc_length = end_pos - start_pos - 8; + let max_size = i32::MAX as u64; + if ipc_length > max_size { + return Err(DataFusionError::Execution(format!( + "Shuffle block size {ipc_length} exceeds maximum size of {max_size}. \ + Try reducing batch size or increasing compression level" + ))); + } + + // fill ipc length + output.seek(SeekFrom::Start(start_pos))?; + output.write_all(&ipc_length.to_le_bytes())?; + output.seek(SeekFrom::Start(end_pos))?; + + timer.stop(); + + Ok((end_pos - start_pos) as usize) + } +} + +pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> { + let fast_encoding = match &bytes[4..8] { + b"AIPC" => false, + b"FAST" => true, + other => { + return Err(DataFusionError::Internal(format!( + "invalid encoding schema: {other:?}" + ))) + } + }; + match &bytes[0..4] { + b"SNAP" => { + let mut decoder = snap::read::FrameDecoder::new(&bytes[8..]); + if fast_encoding { + // TODO avoid reading bytes into interim buffer + let mut buffer = vec![]; + decoder.read_to_end(&mut buffer)?; + let mut reader = BatchReader::new(&buffer); + reader.read_batch() + } else { + let mut reader = StreamReader::try_new(decoder, None)?; + reader.next().unwrap().map_err(|e| e.into()) + } + } + b"LZ4_" => { + let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[8..]); + if fast_encoding { + // TODO avoid reading bytes into interim buffer + let mut buffer = vec![]; + decoder.read_to_end(&mut buffer)?; + let mut reader = BatchReader::new(&buffer); + reader.read_batch() + } else { + let mut reader = StreamReader::try_new(decoder, None)?; + reader.next().unwrap().map_err(|e| e.into()) + } + } + b"ZSTD" => { + let mut decoder = zstd::Decoder::new(&bytes[8..])?; + if fast_encoding { + // TODO avoid reading bytes into interim buffer + let mut buffer = vec![]; + decoder.read_to_end(&mut buffer)?; + let mut reader = BatchReader::new(&buffer); + reader.read_batch() + } else { + let mut reader = StreamReader::try_new(decoder, None)?; + reader.next().unwrap().map_err(|e| e.into()) + } + } + b"NONE" => { + if fast_encoding { + let mut reader = BatchReader::new(&bytes[8..]); + reader.read_batch() + } else { + let mut reader = StreamReader::try_new(&bytes[8..], None)?; + reader.next().unwrap().map_err(|e| e.into()) + } + } + other => Err(DataFusionError::Execution(format!( + "Failed to decode batch: invalid compression codec: {other:?}" + ))), + } +} + +/// Checksum algorithms for writing IPC bytes. +#[derive(Clone)] +pub(crate) enum Checksum { + /// CRC32 checksum algorithm. + CRC32(Hasher), + /// Adler32 checksum algorithm. + Adler32(Adler32), +} + +impl Checksum { + pub(crate) fn try_new(algo: i32, initial_opt: Option<u32>) -> CometResult<Self> { + match algo { + 0 => { + let hasher = if let Some(initial) = initial_opt { + Hasher::new_with_initial(initial) + } else { + Hasher::new() + }; + Ok(Checksum::CRC32(hasher)) + } + 1 => { + let hasher = if let Some(initial) = initial_opt { + // Note that Adler32 initial state is not zero. + // i.e., `Adler32::from_checksum(0)` is not the same as `Adler32::new()`. + Adler32::from_checksum(initial) + } else { + Adler32::new() + }; + Ok(Checksum::Adler32(hasher)) + } + _ => Err(CometError::Internal( + "Unsupported checksum algorithm".to_string(), + )), + } + } + + pub(crate) fn update(&mut self, cursor: &mut Cursor<&mut Vec<u8>>) -> CometResult<()> { + match self { + Checksum::CRC32(hasher) => { + std::io::Seek::seek(cursor, SeekFrom::Start(0))?; + hasher.update(cursor.chunk()); + Ok(()) + } + Checksum::Adler32(hasher) => { + std::io::Seek::seek(cursor, SeekFrom::Start(0))?; + hasher.write(cursor.chunk()); + Ok(()) + } + } + } + + pub(crate) fn finalize(self) -> u32 { + match self { + Checksum::CRC32(hasher) => hasher.finalize(), + Checksum::Adler32(hasher) => hasher.finish(), + } + } +} diff --git a/native/core/src/execution/shuffle/mod.rs b/native/core/src/execution/shuffle/mod.rs index 716034a61..acd7ff551 100644 --- a/native/core/src/execution/shuffle/mod.rs +++ b/native/core/src/execution/shuffle/mod.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. -mod codec; +pub(crate) mod builders; +pub(crate) mod codec; mod list; mod map; pub mod row; mod shuffle_writer; + pub use codec::BatchWriter; -pub use shuffle_writer::{ - read_ipc_compressed, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, -}; +pub use codec::{read_ipc_compressed, CompressionCodec, ShuffleBlockWriter}; +pub use shuffle_writer::ShuffleWriterExec; diff --git a/native/core/src/execution/shuffle/row.rs b/native/core/src/execution/shuffle/row.rs index f9ecf4790..540bebb1d 100644 --- a/native/core/src/execution/shuffle/row.rs +++ b/native/core/src/execution/shuffle/row.rs @@ -21,9 +21,9 @@ use crate::{ errors::CometError, execution::{ shuffle::{ + codec::{Checksum, ShuffleBlockWriter}, list::{append_list_element, SparkUnsafeArray}, map::{append_map_elements, get_map_key_value_dt, SparkUnsafeMap}, - shuffle_writer::{Checksum, ShuffleBlockWriter}, }, utils::bytes_to_i128, }, @@ -292,7 +292,7 @@ macro_rules! downcast_builder_ref { } // Expose the macro for other modules. -use crate::execution::shuffle::shuffle_writer::CompressionCodec; +use crate::execution::shuffle::CompressionCodec; pub(crate) use downcast_builder_ref; /// Appends field of row to the given struct builder. `dt` is the data type of the field. diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 70e832a73..696bfd05d 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -17,25 +17,14 @@ //! Defines the External shuffle repartition plan. -use crate::execution::shuffle::codec::{fast_codec_supports_type, BatchReader}; -use crate::execution::shuffle::BatchWriter; -use crate::{ - common::bit::ceil, - errors::{CometError, CometResult}, +use crate::execution::shuffle::builders::{ + append_columns, make_batch, new_array_builders, slot_size, }; -use arrow::ipc::reader::StreamReader; -use arrow::{datatypes::*, ipc::writer::StreamWriter}; +use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter}; use async_trait::async_trait; -use bytes::Buf; -use crc32fast::Hasher; use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType}; use datafusion::{ - arrow::{ - array::*, - datatypes::{DataType, SchemaRef, TimeUnit}, - error::{ArrowError, Result as ArrowResult}, - record_batch::RecordBatch, - }, + arrow::{array::*, datatypes::SchemaRef, error::ArrowError, record_batch::RecordBatch}, error::{DataFusionError, Result}, execution::{ context::TaskContext, @@ -57,7 +46,6 @@ use datafusion_physical_expr::EquivalenceProperties; use futures::executor::block_on; use futures::{lock::Mutex, Stream, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; -use simd_adler32::Adler32; use std::io::Error; use std::{ any::Any, @@ -71,14 +59,6 @@ use std::{ }; use tokio::time::Instant; -/// The status of appending rows to a partition buffer. -enum AppendRowStatus { - /// The difference in memory usage after appending rows - MemDiff(Result<isize>), - /// The index of the next row to append - StartIndex(usize), -} - /// The shuffle writer operator maps each input partition to M output partitions based on a /// partitioning scheme. No guarantees are made about the order of the resulting partitions. #[derive(Debug)] @@ -101,6 +81,36 @@ pub struct ShuffleWriterExec { enable_fast_encoding: bool, } +impl ShuffleWriterExec { + /// Create a new ShuffleWriterExec + pub fn try_new( + input: Arc<dyn ExecutionPlan>, + partitioning: Partitioning, + codec: CompressionCodec, + output_data_file: String, + output_index_file: String, + enable_fast_encoding: bool, + ) -> Result<Self> { + let cache = PlanProperties::new( + EquivalenceProperties::new(Arc::clone(&input.schema())), + partitioning.clone(), + EmissionType::Final, + Boundedness::Bounded, + ); + + Ok(ShuffleWriterExec { + input, + partitioning, + metrics: ExecutionPlanMetricsSet::new(), + output_data_file, + output_index_file, + cache, + codec, + enable_fast_encoding, + }) + } +} + impl DisplayAs for ShuffleWriterExec { fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result { match t { @@ -122,6 +132,22 @@ impl ExecutionPlan for ShuffleWriterExec { self } + fn name(&self) -> &str { + "ShuffleWriterExec" + } + + fn metrics(&self) -> Option<MetricsSet> { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Result<Statistics> { + self.input.statistics() + } + + fn properties(&self) -> &PlanProperties { + &self.cache + } + /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { self.input.schema() @@ -175,569 +201,138 @@ impl ExecutionPlan for ShuffleWriterExec { .try_flatten(), ))) } +} - fn metrics(&self) -> Option<MetricsSet> { - Some(self.metrics.clone_inner()) - } +#[allow(clippy::too_many_arguments)] +async fn external_shuffle( + mut input: SendableRecordBatchStream, + partition_id: usize, + output_data_file: String, + output_index_file: String, + partitioning: Partitioning, + metrics: ShuffleRepartitionerMetrics, + context: Arc<TaskContext>, + codec: CompressionCodec, + enable_fast_encoding: bool, +) -> Result<SendableRecordBatchStream> { + let schema = input.schema(); + let mut repartitioner = ShuffleRepartitioner::try_new( + partition_id, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + enable_fast_encoding, + )?; - fn statistics(&self) -> Result<Statistics> { - self.input.statistics() + while let Some(batch) = input.next().await { + // Block on the repartitioner to insert the batch and shuffle the rows + // into the corresponding partition buffer. + // Otherwise, pull the next batch from the input stream might overwrite the + // current batch in the repartitioner. + block_on(repartitioner.insert_batch(batch?))?; } + repartitioner.shuffle_write().await +} - fn properties(&self) -> &PlanProperties { - &self.cache - } +struct ShuffleRepartitionerMetrics { + /// metrics + baseline: BaselineMetrics, - fn name(&self) -> &str { - "ShuffleWriterExec" - } -} + /// Time to perform repartitioning + repart_time: Time, -impl ShuffleWriterExec { - /// Create a new ShuffleWriterExec - pub fn try_new( - input: Arc<dyn ExecutionPlan>, - partitioning: Partitioning, - codec: CompressionCodec, - output_data_file: String, - output_index_file: String, - enable_fast_encoding: bool, - ) -> Result<Self> { - let cache = PlanProperties::new( - EquivalenceProperties::new(Arc::clone(&input.schema())), - partitioning.clone(), - EmissionType::Final, - Boundedness::Bounded, - ); + /// Time interacting with memory pool + mempool_time: Time, - Ok(ShuffleWriterExec { - input, - partitioning, - metrics: ExecutionPlanMetricsSet::new(), - output_data_file, - output_index_file, - cache, - codec, - enable_fast_encoding, - }) + /// Time encoding batches to IPC format + encode_time: Time, + + /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. + write_time: Time, + + /// Number of input batches + input_batches: Count, + + /// count of spills during the execution of the operator + spill_count: Count, + + /// total spilled bytes during the execution of the operator + spilled_bytes: Count, + + /// The original size of spilled data. Different to `spilled_bytes` because of compression. + data_size: Count, +} + +impl ShuffleRepartitionerMetrics { + fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + baseline: BaselineMetrics::new(metrics, partition), + repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), + mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition), + encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), + write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), + input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), + spill_count: MetricBuilder::new(metrics).spill_count(partition), + spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), + data_size: MetricBuilder::new(metrics).counter("data_size", partition), + } } } -struct PartitionBuffer { - /// The schema of batches to be partitioned. +struct ShuffleRepartitioner { + output_data_file: String, + output_index_file: String, schema: SchemaRef, - /// The "frozen" Arrow IPC bytes of active data. They are frozen when `flush` is called. - frozen: Vec<u8>, - /// Array builders for appending rows into buffering batches. - active: Vec<Box<dyn ArrayBuilder>>, - /// The estimation of memory size of active builders in bytes when they are filled. - active_slots_mem_size: usize, - /// Number of rows in active builders. - num_active_rows: usize, - /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, - /// the active array builders will be frozen and appended to frozen buffer `frozen`. - batch_size: usize, - /// Memory reservation for this partition buffer. + buffered_partitions: Vec<PartitionBuffer>, + spills: Mutex<Vec<SpillInfo>>, + /// Sort expressions + /// Partitioning scheme to use + partitioning: Partitioning, + num_output_partitions: usize, + runtime: Arc<RuntimeEnv>, + metrics: ShuffleRepartitionerMetrics, reservation: MemoryReservation, - /// Writer that performs encoding and compression - shuffle_block_writer: ShuffleBlockWriter, + /// Hashes for each row in the current batch + hashes_buf: Vec<u32>, + /// Partition ids for each row in the current batch + partition_ids: Vec<u64>, + /// The configured batch size + batch_size: usize, } -impl PartitionBuffer { - fn try_new( +impl ShuffleRepartitioner { + #[allow(clippy::too_many_arguments)] + pub fn try_new( + partition_id: usize, + output_data_file: String, + output_index_file: String, schema: SchemaRef, + partitioning: Partitioning, + metrics: ShuffleRepartitionerMetrics, + runtime: Arc<RuntimeEnv>, batch_size: usize, - partition_id: usize, - runtime: &Arc<RuntimeEnv>, codec: CompressionCodec, enable_fast_encoding: bool, ) -> Result<Self> { - let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) + let num_output_partitions = partitioning.partition_count(); + let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id)) .with_can_spill(true) .register(&runtime.memory_pool); - let shuffle_block_writer = - ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec)?; - Ok(Self { - schema, - frozen: vec![], - active: vec![], - active_slots_mem_size: 0, - num_active_rows: 0, - batch_size, - reservation, - shuffle_block_writer, - }) - } - - /// Initializes active builders if necessary. - /// Returns error if memory reservation fails. - fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> { - let mut mem_diff = 0; - if self.active.is_empty() { - // Estimate the memory size of active builders - if self.active_slots_mem_size == 0 { - self.active_slots_mem_size = self - .schema - .fields() - .iter() - .map(|field| slot_size(self.batch_size, field.data_type())) - .sum::<usize>(); - } + let mut hashes_buf = Vec::with_capacity(batch_size); + let mut partition_ids = Vec::with_capacity(batch_size); - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_grow(self.active_slots_mem_size)?; - mempool_timer.stop(); - - let mut repart_timer = metrics.repart_time.timer(); - self.active = new_array_builders(&self.schema, self.batch_size); - repart_timer.stop(); - - mem_diff += self.active_slots_mem_size as isize; - } - Ok(mem_diff) - } - - /// Appends rows of specified indices from columns into active array builders. - fn append_rows( - &mut self, - columns: &[ArrayRef], - indices: &[usize], - start_index: usize, - metrics: &ShuffleRepartitionerMetrics, - ) -> AppendRowStatus { - let mut mem_diff = 0; - let mut start = start_index; - - // lazy init because some partition may be empty - let init = self.init_active_if_necessary(metrics); - if init.is_err() { - return AppendRowStatus::StartIndex(start); - } - mem_diff += init.unwrap(); - - while start < indices.len() { - let end = (start + self.batch_size).min(indices.len()); - - let mut repart_timer = metrics.repart_time.timer(); - self.active - .iter_mut() - .zip(columns) - .for_each(|(builder, column)| { - append_columns(builder, column, &indices[start..end], column.data_type()); - }); - self.num_active_rows += end - start; - repart_timer.stop(); - - if self.num_active_rows >= self.batch_size { - let flush = self.flush(metrics); - if let Err(e) = flush { - return AppendRowStatus::MemDiff(Err(e)); - } - mem_diff += flush.unwrap(); - - let init = self.init_active_if_necessary(metrics); - if init.is_err() { - return AppendRowStatus::StartIndex(end); - } - mem_diff += init.unwrap(); - } - start = end; - } - AppendRowStatus::MemDiff(Ok(mem_diff)) - } - - /// flush active data into frozen bytes - fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> { - if self.num_active_rows == 0 { - return Ok(0); - } - let mut mem_diff = 0isize; - - // active -> staging - let active = std::mem::take(&mut self.active); - let num_rows = self.num_active_rows; - self.num_active_rows = 0; - - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_shrink(self.active_slots_mem_size)?; - mempool_timer.stop(); - - let mut repart_timer = metrics.repart_time.timer(); - let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; - repart_timer.stop(); - - let frozen_capacity_old = self.frozen.capacity(); - let mut cursor = Cursor::new(&mut self.frozen); - cursor.seek(SeekFrom::End(0))?; - self.shuffle_block_writer - .write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?; - - mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; - Ok(mem_diff) - } -} - -fn slot_size(len: usize, data_type: &DataType) -> usize { - match data_type { - DataType::Boolean => ceil(len, 8), - DataType::Int8 => len, - DataType::Int16 => len * 2, - DataType::Int32 => len * 4, - DataType::Int64 => len * 8, - DataType::UInt8 => len, - DataType::UInt16 => len * 2, - DataType::UInt32 => len * 4, - DataType::UInt64 => len * 8, - DataType::Float32 => len * 4, - DataType::Float64 => len * 8, - DataType::Date32 => len * 4, - DataType::Date64 => len * 8, - DataType::Time32(TimeUnit::Second) => len * 4, - DataType::Time32(TimeUnit::Millisecond) => len * 4, - DataType::Time64(TimeUnit::Microsecond) => len * 8, - DataType::Time64(TimeUnit::Nanosecond) => len * 8, - // TODO: this is not accurate, but should be good enough for now - DataType::Utf8 => len * 100 + len * 4, - DataType::LargeUtf8 => len * 100 + len * 8, - DataType::Decimal128(_, _) => len * 16, - DataType::Dictionary(key_type, value_type) => { - // TODO: this is not accurate, but should be good enough for now - slot_size(len, key_type.as_ref()) + slot_size(len / 10, value_type.as_ref()) - } - // TODO: this is not accurate, but should be good enough for now - DataType::Binary => len * 100 + len * 4, - DataType::LargeBinary => len * 100 + len * 8, - DataType::FixedSizeBinary(s) => len * (*s as usize), - DataType::Timestamp(_, _) => len * 8, - dt => unimplemented!( - "{}", - format!("data type {dt} not supported in shuffle write") - ), - } -} - -fn append_columns( - to: &mut Box<dyn ArrayBuilder>, - from: &Arc<dyn Array>, - indices: &[usize], - data_type: &DataType, -) { - /// Append values from `from` to `to` using `indices`. - macro_rules! append { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::<B>().unwrap(); - let f = from.as_any().downcast_ref::<A>().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - /// Some array builder (e.g. `FixedSizeBinary`) its `append_value` method returning - /// a `Result`. - macro_rules! append_unwrap { - ($arrowty:ident) => {{ - type B = paste::paste! {[< $arrowty Builder >]}; - type A = paste::paste! {[< $arrowty Array >]}; - let t = to.as_any_mut().downcast_mut::<B>().unwrap(); - let f = from.as_any().downcast_ref::<A>().unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)).unwrap(); - } else { - t.append_null(); - } - } - }}; - } - - /// Appends values from a dictionary array to a dictionary builder. - macro_rules! append_dict { - ($kt:ty, $builder:ty, $dict_array:ty) => {{ - let t = to.as_any_mut().downcast_mut::<$builder>().unwrap(); - let f = from - .as_any() - .downcast_ref::<DictionaryArray<$kt>>() - .unwrap() - .downcast_dict::<$dict_array>() - .unwrap(); - for &i in indices { - if f.is_valid(i) { - t.append_value(f.value(i)); - } else { - t.append_null(); - } - } - }}; - } - - macro_rules! append_dict_helper { - ($kt:ident, $ty:ty, $dict_array:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => append_dict!(Int8Type, PrimitiveDictionaryBuilder<Int8Type, $ty>, $dict_array), - DataType::Int16 => append_dict!(Int16Type, PrimitiveDictionaryBuilder<Int16Type, $ty>, $dict_array), - DataType::Int32 => append_dict!(Int32Type, PrimitiveDictionaryBuilder<Int32Type, $ty>, $dict_array), - DataType::Int64 => append_dict!(Int64Type, PrimitiveDictionaryBuilder<Int64Type, $ty>, $dict_array), - DataType::UInt8 => append_dict!(UInt8Type, PrimitiveDictionaryBuilder<UInt8Type, $ty>, $dict_array), - DataType::UInt16 => { - append_dict!(UInt16Type, PrimitiveDictionaryBuilder<UInt16Type, $ty>, $dict_array) - } - DataType::UInt32 => { - append_dict!(UInt32Type, PrimitiveDictionaryBuilder<UInt32Type, $ty>, $dict_array) - } - DataType::UInt64 => { - append_dict!(UInt64Type, PrimitiveDictionaryBuilder<UInt64Type, $ty>, $dict_array) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - macro_rules! primitive_append_dict_helper { - ($kt:ident, $vt:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - append_dict_helper!($kt, Int8Type, Int8Array) - } - DataType::Int16 => { - append_dict_helper!($kt, Int16Type, Int16Array) - } - DataType::Int32 => { - append_dict_helper!($kt, Int32Type, Int32Array) - } - DataType::Int64 => { - append_dict_helper!($kt, Int64Type, Int64Array) - } - DataType::UInt8 => { - append_dict_helper!($kt, UInt8Type, UInt8Array) - } - DataType::UInt16 => { - append_dict_helper!($kt, UInt16Type, UInt16Array) - } - DataType::UInt32 => { - append_dict_helper!($kt, UInt32Type, UInt32Array) - } - DataType::UInt64 => { - append_dict_helper!($kt, UInt64Type, UInt64Array) - } - DataType::Float32 => { - append_dict_helper!($kt, Float32Type, Float32Array) - } - DataType::Float64 => { - append_dict_helper!($kt, Float64Type, Float64Array) - } - DataType::Decimal128(_, _) => { - append_dict_helper!($kt, Decimal128Type, Decimal128Array) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append_dict_helper!($kt, TimestampMicrosecondType, TimestampMicrosecondArray) - } - DataType::Date32 => { - append_dict_helper!($kt, Date32Type, Date32Array) - } - DataType::Date64 => { - append_dict_helper!($kt, Date64Type, Date64Array) - } - t => unimplemented!("{:?} is not supported for appending dictionary builder", t), - } - }; - } - - macro_rules! append_byte_dict { - ($kt:ident, $byte_type:ty, $array_type:ty) => {{ - match $kt.as_ref() { - DataType::Int8 => { - append_dict!(Int8Type, GenericByteDictionaryBuilder<Int8Type, $byte_type>, $array_type) - } - DataType::Int16 => { - append_dict!(Int16Type, GenericByteDictionaryBuilder<Int16Type, $byte_type>, $array_type) - } - DataType::Int32 => { - append_dict!(Int32Type, GenericByteDictionaryBuilder<Int32Type, $byte_type>, $array_type) - } - DataType::Int64 => { - append_dict!(Int64Type, GenericByteDictionaryBuilder<Int64Type, $byte_type>, $array_type) - } - DataType::UInt8 => { - append_dict!(UInt8Type, GenericByteDictionaryBuilder<UInt8Type, $byte_type>, $array_type) - } - DataType::UInt16 => { - append_dict!(UInt16Type, GenericByteDictionaryBuilder<UInt16Type, $byte_type>, $array_type) - } - DataType::UInt32 => { - append_dict!(UInt32Type, GenericByteDictionaryBuilder<UInt32Type, $byte_type>, $array_type) - } - DataType::UInt64 => { - append_dict!(UInt64Type, GenericByteDictionaryBuilder<UInt64Type, $byte_type>, $array_type) - } - _ => unreachable!("Unknown key type for dictionary"), - } - }}; - } - - match data_type { - DataType::Boolean => append!(Boolean), - DataType::Int8 => append!(Int8), - DataType::Int16 => append!(Int16), - DataType::Int32 => append!(Int32), - DataType::Int64 => append!(Int64), - DataType::UInt8 => append!(UInt8), - DataType::UInt16 => append!(UInt16), - DataType::UInt32 => append!(UInt32), - DataType::UInt64 => append!(UInt64), - DataType::Float32 => append!(Float32), - DataType::Float64 => append!(Float64), - DataType::Date32 => append!(Date32), - DataType::Date64 => append!(Date64), - DataType::Time32(TimeUnit::Second) => append!(Time32Second), - DataType::Time32(TimeUnit::Millisecond) => append!(Time32Millisecond), - DataType::Time64(TimeUnit::Microsecond) => append!(Time64Microsecond), - DataType::Time64(TimeUnit::Nanosecond) => append!(Time64Nanosecond), - DataType::Timestamp(TimeUnit::Microsecond, _) => { - append!(TimestampMicrosecond) - } - DataType::Utf8 => append!(String), - DataType::LargeUtf8 => append!(LargeString), - DataType::Decimal128(_, _) => append!(Decimal128), - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - primitive_append_dict_helper!(key_type, value_type) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - append_byte_dict!(key_type, GenericStringType<i32>, StringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - append_byte_dict!(key_type, GenericStringType<i64>, LargeStringArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - append_byte_dict!(key_type, GenericBinaryType<i32>, BinaryArray) - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - append_byte_dict!(key_type, GenericBinaryType<i64>, LargeBinaryArray) - } - DataType::Binary => append!(Binary), - DataType::LargeBinary => append!(LargeBinary), - DataType::FixedSizeBinary(_) => append_unwrap!(FixedSizeBinary), - t => unimplemented!( - "{}", - format!("data type {} not supported in shuffle write", t) - ), - } -} - -struct SpillInfo { - file: RefCountedTempFile, - offsets: Vec<u64>, -} - -struct ShuffleRepartitioner { - output_data_file: String, - output_index_file: String, - schema: SchemaRef, - buffered_partitions: Vec<PartitionBuffer>, - spills: Mutex<Vec<SpillInfo>>, - /// Sort expressions - /// Partitioning scheme to use - partitioning: Partitioning, - num_output_partitions: usize, - runtime: Arc<RuntimeEnv>, - metrics: ShuffleRepartitionerMetrics, - reservation: MemoryReservation, - /// Hashes for each row in the current batch - hashes_buf: Vec<u32>, - /// Partition ids for each row in the current batch - partition_ids: Vec<u64>, - /// The configured batch size - batch_size: usize, -} - -struct ShuffleRepartitionerMetrics { - /// metrics - baseline: BaselineMetrics, - - /// Time to perform repartitioning - repart_time: Time, - - /// Time interacting with memory pool - mempool_time: Time, - - /// Time encoding batches to IPC format - encode_time: Time, - - /// Time spent writing to disk. Maps to "shuffleWriteTime" in Spark SQL Metrics. - write_time: Time, - - /// Number of input batches - input_batches: Count, - - /// count of spills during the execution of the operator - spill_count: Count, - - /// total spilled bytes during the execution of the operator - spilled_bytes: Count, - - /// The original size of spilled data. Different to `spilled_bytes` because of compression. - data_size: Count, -} - -impl ShuffleRepartitionerMetrics { - fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - Self { - baseline: BaselineMetrics::new(metrics, partition), - repart_time: MetricBuilder::new(metrics).subset_time("repart_time", partition), - mempool_time: MetricBuilder::new(metrics).subset_time("mempool_time", partition), - encode_time: MetricBuilder::new(metrics).subset_time("encode_time", partition), - write_time: MetricBuilder::new(metrics).subset_time("write_time", partition), - input_batches: MetricBuilder::new(metrics).counter("input_batches", partition), - spill_count: MetricBuilder::new(metrics).spill_count(partition), - spilled_bytes: MetricBuilder::new(metrics).spilled_bytes(partition), - data_size: MetricBuilder::new(metrics).counter("data_size", partition), - } - } -} - -impl ShuffleRepartitioner { - #[allow(clippy::too_many_arguments)] - pub fn try_new( - partition_id: usize, - output_data_file: String, - output_index_file: String, - schema: SchemaRef, - partitioning: Partitioning, - metrics: ShuffleRepartitionerMetrics, - runtime: Arc<RuntimeEnv>, - batch_size: usize, - codec: CompressionCodec, - enable_fast_encoding: bool, - ) -> Result<Self> { - let num_output_partitions = partitioning.partition_count(); - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id)) - .with_can_spill(true) - .register(&runtime.memory_pool); - - let mut hashes_buf = Vec::with_capacity(batch_size); - let mut partition_ids = Vec::with_capacity(batch_size); - - // Safety: `hashes_buf` will be filled with valid values before being used. - // `partition_ids` will be filled with valid values before being used. - unsafe { - hashes_buf.set_len(batch_size); - partition_ids.set_len(batch_size); - } + // Safety: `hashes_buf` will be filled with valid values before being used. + // `partition_ids` will be filled with valid values before being used. + unsafe { + hashes_buf.set_len(batch_size); + partition_ids.set_len(batch_size); + } Ok(Self { output_data_file, @@ -1129,43 +724,6 @@ impl ShuffleRepartitioner { } } -/// consume the `buffered_partitions` and do spill into a single temp shuffle output file -fn spill_into( - buffered_partitions: &mut [PartitionBuffer], - path: &Path, - num_output_partitions: usize, - metrics: &ShuffleRepartitionerMetrics, -) -> Result<Vec<u64>> { - let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions]; - - for i in 0..num_output_partitions { - buffered_partitions[i].flush(metrics)?; - output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); - } - let path = path.to_owned(); - - let mut write_timer = metrics.write_time.timer(); - - let mut offsets = vec![0; num_output_partitions + 1]; - let mut spill_data = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(path) - .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?; - - for i in 0..num_output_partitions { - offsets[i] = spill_data.stream_position()?; - spill_data.write_all(&output_batches[i])?; - output_batches[i].clear(); - } - write_timer.stop(); - - // add one extra offset at last to ease partition length computation - offsets[num_output_partitions] = spill_data.stream_position()?; - Ok(offsets) -} - impl Debug for ShuffleRepartitioner { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct("ShuffleRepartitioner") @@ -1177,655 +735,209 @@ impl Debug for ShuffleRepartitioner { } } -#[allow(clippy::too_many_arguments)] -async fn external_shuffle( - mut input: SendableRecordBatchStream, - partition_id: usize, - output_data_file: String, - output_index_file: String, - partitioning: Partitioning, - metrics: ShuffleRepartitionerMetrics, - context: Arc<TaskContext>, - codec: CompressionCodec, - enable_fast_encoding: bool, -) -> Result<SendableRecordBatchStream> { - let schema = input.schema(); - let mut repartitioner = ShuffleRepartitioner::try_new( - partition_id, - output_data_file, - output_index_file, - Arc::clone(&schema), - partitioning, - metrics, - context.runtime_env(), - context.session_config().batch_size(), - codec, - enable_fast_encoding, - )?; - - while let Some(batch) = input.next().await { - // Block on the repartitioner to insert the batch and shuffle the rows - // into the corresponding partition buffer. - // Otherwise, pull the next batch from the input stream might overwrite the - // current batch in the repartitioner. - block_on(repartitioner.insert_batch(batch?))?; - } - repartitioner.shuffle_write().await -} - -fn new_array_builders(schema: &SchemaRef, batch_size: usize) -> Vec<Box<dyn ArrayBuilder>> { - schema - .fields() - .iter() - .map(|field| { - let dt = field.data_type(); - if matches!(dt, DataType::Dictionary(_, _)) { - make_dict_builder(dt, batch_size) - } else { - make_builder(dt, batch_size) - } - }) - .collect::<Vec<_>>() -} - -macro_rules! primitive_dict_builder_inner_helper { - ($kt:ty, $vt:ty, $capacity:ident) => { - Box::new(PrimitiveDictionaryBuilder::<$kt, $vt>::with_capacity( - $capacity, - $capacity / 100, - )) - }; -} - -macro_rules! primitive_dict_builder_helper { - ($kt:ty, $vt:ident, $capacity:ident) => { - match $vt.as_ref() { - DataType::Int8 => { - primitive_dict_builder_inner_helper!($kt, Int8Type, $capacity) - } - DataType::Int16 => { - primitive_dict_builder_inner_helper!($kt, Int16Type, $capacity) - } - DataType::Int32 => { - primitive_dict_builder_inner_helper!($kt, Int32Type, $capacity) - } - DataType::Int64 => { - primitive_dict_builder_inner_helper!($kt, Int64Type, $capacity) - } - DataType::UInt8 => { - primitive_dict_builder_inner_helper!($kt, UInt8Type, $capacity) - } - DataType::UInt16 => { - primitive_dict_builder_inner_helper!($kt, UInt16Type, $capacity) - } - DataType::UInt32 => { - primitive_dict_builder_inner_helper!($kt, UInt32Type, $capacity) - } - DataType::UInt64 => { - primitive_dict_builder_inner_helper!($kt, UInt64Type, $capacity) - } - DataType::Float32 => { - primitive_dict_builder_inner_helper!($kt, Float32Type, $capacity) - } - DataType::Float64 => { - primitive_dict_builder_inner_helper!($kt, Float64Type, $capacity) - } - DataType::Decimal128(p, s) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = - Decimal128Builder::new().with_data_type(DataType::Decimal128(*p, *s)); - Box::new( - PrimitiveDictionaryBuilder::<$kt, Decimal128Type>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Timestamp(TimeUnit::Microsecond, timezone) => { - let keys_builder = PrimitiveBuilder::<$kt>::new(); - let values_builder = TimestampMicrosecondBuilder::new() - .with_data_type(DataType::Timestamp(TimeUnit::Microsecond, timezone.clone())); - Box::new( - PrimitiveDictionaryBuilder::<$kt, TimestampMicrosecondType>::new_from_empty_builders( - keys_builder, - values_builder, - ), - ) - } - DataType::Date32 => { - primitive_dict_builder_inner_helper!($kt, Date32Type, $capacity) - } - DataType::Date64 => { - primitive_dict_builder_inner_helper!($kt, Date64Type, $capacity) - } - t => unimplemented!("{:?} is not supported", t), - } - }; +/// The status of appending rows to a partition buffer. +enum AppendRowStatus { + /// The difference in memory usage after appending rows + MemDiff(Result<isize>), + /// The index of the next row to append + StartIndex(usize), } -macro_rules! byte_dict_builder_inner_helper { - ($kt:ty, $capacity:ident, $builder:ident) => { - Box::new($builder::<$kt>::with_capacity( - $capacity, - $capacity / 100, - $capacity, - )) - }; +struct PartitionBuffer { + /// The schema of batches to be partitioned. + schema: SchemaRef, + /// The "frozen" Arrow IPC bytes of active data. They are frozen when `flush` is called. + frozen: Vec<u8>, + /// Array builders for appending rows into buffering batches. + active: Vec<Box<dyn ArrayBuilder>>, + /// The estimation of memory size of active builders in bytes when they are filled. + active_slots_mem_size: usize, + /// Number of rows in active builders. + num_active_rows: usize, + /// The maximum number of rows in a batch. Once `num_active_rows` reaches `batch_size`, + /// the active array builders will be frozen and appended to frozen buffer `frozen`. + batch_size: usize, + /// Memory reservation for this partition buffer. + reservation: MemoryReservation, + /// Writer that performs encoding and compression + shuffle_block_writer: ShuffleBlockWriter, } -/// Returns a dictionary array builder with capacity `capacity` that corresponds to the datatype -/// `DataType` This function is useful to construct arrays from an arbitrary vectors with -/// known/expected schema. -/// TODO: move this to the upstream. -fn make_dict_builder(datatype: &DataType, capacity: usize) -> Box<dyn ArrayBuilder> { - match datatype { - DataType::Dictionary(key_type, value_type) if value_type.is_primitive() => { - match key_type.as_ref() { - DataType::Int8 => primitive_dict_builder_helper!(Int8Type, value_type, capacity), - DataType::Int16 => primitive_dict_builder_helper!(Int16Type, value_type, capacity), - DataType::Int32 => primitive_dict_builder_helper!(Int32Type, value_type, capacity), - DataType::Int64 => primitive_dict_builder_helper!(Int64Type, value_type, capacity), - DataType::UInt8 => primitive_dict_builder_helper!(UInt8Type, value_type, capacity), - DataType::UInt16 => { - primitive_dict_builder_helper!(UInt16Type, value_type, capacity) - } - DataType::UInt32 => { - primitive_dict_builder_helper!(UInt32Type, value_type, capacity) - } - DataType::UInt64 => { - primitive_dict_builder_helper!(UInt64Type, value_type, capacity) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Utf8) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, StringDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, StringDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, StringDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, StringDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, StringDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, StringDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, StringDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, StringDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeUtf8) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeStringDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeStringDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeStringDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::Binary) => - { - match key_type.as_ref() { - DataType::Int8 => { - byte_dict_builder_inner_helper!(Int8Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int16 => { - byte_dict_builder_inner_helper!(Int16Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int32 => { - byte_dict_builder_inner_helper!(Int32Type, capacity, BinaryDictionaryBuilder) - } - DataType::Int64 => { - byte_dict_builder_inner_helper!(Int64Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt8 => { - byte_dict_builder_inner_helper!(UInt8Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt16 => { - byte_dict_builder_inner_helper!(UInt16Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!(UInt32Type, capacity, BinaryDictionaryBuilder) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!(UInt64Type, capacity, BinaryDictionaryBuilder) - } - _ => unreachable!(""), - } - } - DataType::Dictionary(key_type, value_type) - if matches!(value_type.as_ref(), DataType::LargeBinary) => - { - match key_type.as_ref() { - DataType::Int8 => byte_dict_builder_inner_helper!( - Int8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int16 => byte_dict_builder_inner_helper!( - Int16Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int32 => byte_dict_builder_inner_helper!( - Int32Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::Int64 => byte_dict_builder_inner_helper!( - Int64Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt8 => byte_dict_builder_inner_helper!( - UInt8Type, - capacity, - LargeBinaryDictionaryBuilder - ), - DataType::UInt16 => { - byte_dict_builder_inner_helper!( - UInt16Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt32 => { - byte_dict_builder_inner_helper!( - UInt32Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - DataType::UInt64 => { - byte_dict_builder_inner_helper!( - UInt64Type, - capacity, - LargeBinaryDictionaryBuilder - ) - } - _ => unreachable!(""), - } - } - t => panic!("Data type {t:?} is not currently supported"), +impl PartitionBuffer { + fn try_new( + schema: SchemaRef, + batch_size: usize, + partition_id: usize, + runtime: &Arc<RuntimeEnv>, + codec: CompressionCodec, + enable_fast_encoding: bool, + ) -> Result<Self> { + let reservation = MemoryConsumer::new(format!("PartitionBuffer[{}]", partition_id)) + .with_can_spill(true) + .register(&runtime.memory_pool); + let shuffle_block_writer = + ShuffleBlockWriter::try_new(schema.as_ref(), enable_fast_encoding, codec)?; + Ok(Self { + schema, + frozen: vec![], + active: vec![], + active_slots_mem_size: 0, + num_active_rows: 0, + batch_size, + reservation, + shuffle_block_writer, + }) } -} - -fn make_batch( - schema: SchemaRef, - mut arrays: Vec<Box<dyn ArrayBuilder>>, - row_count: usize, -) -> ArrowResult<RecordBatch> { - let columns = arrays.iter_mut().map(|array| array.finish()).collect(); - let options = RecordBatchOptions::new().with_row_count(Option::from(row_count)); - RecordBatch::try_new_with_options(schema, columns, &options) -} -/// Checksum algorithms for writing IPC bytes. -#[derive(Clone)] -pub(crate) enum Checksum { - /// CRC32 checksum algorithm. - CRC32(Hasher), - /// Adler32 checksum algorithm. - Adler32(Adler32), -} + /// Initializes active builders if necessary. + /// Returns error if memory reservation fails. + fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> { + let mut mem_diff = 0; -impl Checksum { - pub(crate) fn try_new(algo: i32, initial_opt: Option<u32>) -> CometResult<Self> { - match algo { - 0 => { - let hasher = if let Some(initial) = initial_opt { - Hasher::new_with_initial(initial) - } else { - Hasher::new() - }; - Ok(Checksum::CRC32(hasher)) - } - 1 => { - let hasher = if let Some(initial) = initial_opt { - // Note that Adler32 initial state is not zero. - // i.e., `Adler32::from_checksum(0)` is not the same as `Adler32::new()`. - Adler32::from_checksum(initial) - } else { - Adler32::new() - }; - Ok(Checksum::Adler32(hasher)) + if self.active.is_empty() { + // Estimate the memory size of active builders + if self.active_slots_mem_size == 0 { + self.active_slots_mem_size = self + .schema + .fields() + .iter() + .map(|field| slot_size(self.batch_size, field.data_type())) + .sum::<usize>(); } - _ => Err(CometError::Internal( - "Unsupported checksum algorithm".to_string(), - )), - } - } - pub(crate) fn update(&mut self, cursor: &mut Cursor<&mut Vec<u8>>) -> CometResult<()> { - match self { - Checksum::CRC32(hasher) => { - std::io::Seek::seek(cursor, SeekFrom::Start(0))?; - hasher.update(cursor.chunk()); - Ok(()) - } - Checksum::Adler32(hasher) => { - std::io::Seek::seek(cursor, SeekFrom::Start(0))?; - hasher.write(cursor.chunk()); - Ok(()) - } - } - } + let mut mempool_timer = metrics.mempool_time.timer(); + self.reservation.try_grow(self.active_slots_mem_size)?; + mempool_timer.stop(); + + let mut repart_timer = metrics.repart_time.timer(); + self.active = new_array_builders(&self.schema, self.batch_size); + repart_timer.stop(); - pub(crate) fn finalize(self) -> u32 { - match self { - Checksum::CRC32(hasher) => hasher.finalize(), - Checksum::Adler32(hasher) => hasher.finish(), + mem_diff += self.active_slots_mem_size as isize; } + Ok(mem_diff) } -} - -#[derive(Debug, Clone)] -pub enum CompressionCodec { - None, - Lz4Frame, - Zstd(i32), - Snappy, -} -pub struct ShuffleBlockWriter { - fast_encoding: bool, - codec: CompressionCodec, - encoded_schema: Vec<u8>, - header_bytes: Vec<u8>, -} + /// Appends rows of specified indices from columns into active array builders. + fn append_rows( + &mut self, + columns: &[ArrayRef], + indices: &[usize], + start_index: usize, + metrics: &ShuffleRepartitionerMetrics, + ) -> AppendRowStatus { + let mut mem_diff = 0; + let mut start = start_index; -impl ShuffleBlockWriter { - pub fn try_new( - schema: &Schema, - enable_fast_encoding: bool, - codec: CompressionCodec, - ) -> Result<Self> { - let mut encoded_schema = vec![]; - - let enable_fast_encoding = enable_fast_encoding - && schema - .fields() - .iter() - .all(|f| fast_codec_supports_type(f.data_type())); - - // encode the schema once and then reuse the encoded bytes for each batch - if enable_fast_encoding { - let mut w = BatchWriter::new(&mut encoded_schema); - w.write_partial_schema(schema)?; + // lazy init because some partition may be empty + let init = self.init_active_if_necessary(metrics); + if init.is_err() { + return AppendRowStatus::StartIndex(start); } + mem_diff += init.unwrap(); - let header_bytes = Vec::with_capacity(24); - let mut cursor = Cursor::new(header_bytes); - - // leave space for compressed message length - cursor.seek_relative(8)?; + while start < indices.len() { + let end = (start + self.batch_size).min(indices.len()); - // write number of columns because JVM side needs to know how many addresses to allocate - let field_count = schema.fields().len(); - cursor.write_all(&field_count.to_le_bytes())?; + let mut repart_timer = metrics.repart_time.timer(); + self.active + .iter_mut() + .zip(columns) + .for_each(|(builder, column)| { + append_columns(builder, column, &indices[start..end], column.data_type()); + }); + self.num_active_rows += end - start; + repart_timer.stop(); - // write compression codec to header - let codec_header = match &codec { - CompressionCodec::Snappy => b"SNAP", - CompressionCodec::Lz4Frame => b"LZ4_", - CompressionCodec::Zstd(_) => b"ZSTD", - CompressionCodec::None => b"NONE", - }; - cursor.write_all(codec_header)?; + if self.num_active_rows >= self.batch_size { + let flush = self.flush(metrics); + if let Err(e) = flush { + return AppendRowStatus::MemDiff(Err(e)); + } + mem_diff += flush.unwrap(); - // write encoding scheme - if enable_fast_encoding { - cursor.write_all(b"FAST")?; - } else { - cursor.write_all(b"AIPC")?; + let init = self.init_active_if_necessary(metrics); + if init.is_err() { + return AppendRowStatus::StartIndex(end); + } + mem_diff += init.unwrap(); + } + start = end; } - - let header_bytes = cursor.into_inner(); - - Ok(Self { - fast_encoding: enable_fast_encoding, - codec, - encoded_schema, - header_bytes, - }) + AppendRowStatus::MemDiff(Ok(mem_diff)) } - /// Writes given record batch as Arrow IPC bytes into given writer. - /// Returns number of bytes written. - pub fn write_batch<W: Write + Seek>( - &self, - batch: &RecordBatch, - output: &mut W, - ipc_time: &Time, - ) -> Result<usize> { - if batch.num_rows() == 0 { + /// flush active data into frozen bytes + fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<isize> { + if self.num_active_rows == 0 { return Ok(0); } + let mut mem_diff = 0isize; - let mut timer = ipc_time.timer(); - let start_pos = output.stream_position()?; - - // write header - output.write_all(&self.header_bytes)?; + // active -> staging + let active = std::mem::take(&mut self.active); + let num_rows = self.num_active_rows; + self.num_active_rows = 0; - let output = if self.fast_encoding { - match &self.codec { - CompressionCodec::None => { - let mut fast_writer = BatchWriter::new(&mut *output); - fast_writer.write_all(&self.encoded_schema)?; - fast_writer.write_batch(batch)?; - output - } - CompressionCodec::Lz4Frame => { - let mut wtr = lz4_flex::frame::FrameEncoder::new(output); - let mut fast_writer = BatchWriter::new(&mut wtr); - fast_writer.write_all(&self.encoded_schema)?; - fast_writer.write_batch(batch)?; - wtr.finish().map_err(|e| { - DataFusionError::Execution(format!("lz4 compression error: {}", e)) - })? - } - CompressionCodec::Zstd(level) => { - let mut encoder = zstd::Encoder::new(output, *level)?; - let mut fast_writer = BatchWriter::new(&mut encoder); - fast_writer.write_all(&self.encoded_schema)?; - fast_writer.write_batch(batch)?; - encoder.finish()? - } - CompressionCodec::Snappy => { - let mut encoder = snap::write::FrameEncoder::new(output); - let mut fast_writer = BatchWriter::new(&mut encoder); - fast_writer.write_all(&self.encoded_schema)?; - fast_writer.write_batch(batch)?; - encoder.into_inner().map_err(|e| { - DataFusionError::Execution(format!("snappy compression error: {}", e)) - })? - } - } - } else { - match &self.codec { - CompressionCodec::None => { - let mut arrow_writer = StreamWriter::try_new(output, &batch.schema())?; - arrow_writer.write(batch)?; - arrow_writer.finish()?; - arrow_writer.into_inner()? - } - CompressionCodec::Lz4Frame => { - let mut wtr = lz4_flex::frame::FrameEncoder::new(output); - let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; - arrow_writer.write(batch)?; - arrow_writer.finish()?; - wtr.finish().map_err(|e| { - DataFusionError::Execution(format!("lz4 compression error: {}", e)) - })? - } + let mut mempool_timer = metrics.mempool_time.timer(); + self.reservation.try_shrink(self.active_slots_mem_size)?; + mempool_timer.stop(); - CompressionCodec::Zstd(level) => { - let encoder = zstd::Encoder::new(output, *level)?; - let mut arrow_writer = StreamWriter::try_new(encoder, &batch.schema())?; - arrow_writer.write(batch)?; - arrow_writer.finish()?; - let zstd_encoder = arrow_writer.into_inner()?; - zstd_encoder.finish()? - } + let mut repart_timer = metrics.repart_time.timer(); + let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; + repart_timer.stop(); - CompressionCodec::Snappy => { - let mut wtr = snap::write::FrameEncoder::new(output); - let mut arrow_writer = StreamWriter::try_new(&mut wtr, &batch.schema())?; - arrow_writer.write(batch)?; - arrow_writer.finish()?; - wtr.into_inner().map_err(|e| { - DataFusionError::Execution(format!("snappy compression error: {}", e)) - })? - } - } - }; + let frozen_capacity_old = self.frozen.capacity(); + let mut cursor = Cursor::new(&mut self.frozen); + cursor.seek(SeekFrom::End(0))?; + self.shuffle_block_writer + .write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?; - // fill ipc length - let end_pos = output.stream_position()?; - let ipc_length = end_pos - start_pos - 8; - let max_size = i32::MAX as u64; - if ipc_length > max_size { - return Err(DataFusionError::Execution(format!( - "Shuffle block size {ipc_length} exceeds maximum size of {max_size}. \ - Try reducing batch size or increasing compression level" - ))); - } + mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; + Ok(mem_diff) + } +} - // fill ipc length - output.seek(SeekFrom::Start(start_pos))?; - output.write_all(&ipc_length.to_le_bytes())?; - output.seek(SeekFrom::Start(end_pos))?; +struct SpillInfo { + file: RefCountedTempFile, + offsets: Vec<u64>, +} - timer.stop(); +/// consume the `buffered_partitions` and do spill into a single temp shuffle output file +fn spill_into( + buffered_partitions: &mut [PartitionBuffer], + path: &Path, + num_output_partitions: usize, + metrics: &ShuffleRepartitionerMetrics, +) -> Result<Vec<u64>> { + let mut output_batches: Vec<Vec<u8>> = vec![vec![]; num_output_partitions]; - Ok((end_pos - start_pos) as usize) + for i in 0..num_output_partitions { + buffered_partitions[i].flush(metrics)?; + output_batches[i] = std::mem::take(&mut buffered_partitions[i].frozen); } -} + let path = path.to_owned(); -pub fn read_ipc_compressed(bytes: &[u8]) -> Result<RecordBatch> { - let fast_encoding = match &bytes[4..8] { - b"AIPC" => false, - b"FAST" => true, - other => { - return Err(DataFusionError::Internal(format!( - "invalid encoding schema: {other:?}" - ))) - } - }; - match &bytes[0..4] { - b"SNAP" => { - let mut decoder = snap::read::FrameDecoder::new(&bytes[8..]); - if fast_encoding { - // TODO avoid reading bytes into interim buffer - let mut buffer = vec![]; - decoder.read_to_end(&mut buffer)?; - let mut reader = BatchReader::new(&buffer); - reader.read_batch() - } else { - let mut reader = StreamReader::try_new(decoder, None)?; - reader.next().unwrap().map_err(|e| e.into()) - } - } - b"LZ4_" => { - let mut decoder = lz4_flex::frame::FrameDecoder::new(&bytes[8..]); - if fast_encoding { - // TODO avoid reading bytes into interim buffer - let mut buffer = vec![]; - decoder.read_to_end(&mut buffer)?; - let mut reader = BatchReader::new(&buffer); - reader.read_batch() - } else { - let mut reader = StreamReader::try_new(decoder, None)?; - reader.next().unwrap().map_err(|e| e.into()) - } - } - b"ZSTD" => { - let mut decoder = zstd::Decoder::new(&bytes[8..])?; - if fast_encoding { - // TODO avoid reading bytes into interim buffer - let mut buffer = vec![]; - decoder.read_to_end(&mut buffer)?; - let mut reader = BatchReader::new(&buffer); - reader.read_batch() - } else { - let mut reader = StreamReader::try_new(decoder, None)?; - reader.next().unwrap().map_err(|e| e.into()) - } - } - b"NONE" => { - if fast_encoding { - let mut reader = BatchReader::new(&bytes[8..]); - reader.read_batch() - } else { - let mut reader = StreamReader::try_new(&bytes[8..], None)?; - reader.next().unwrap().map_err(|e| e.into()) - } - } - other => Err(DataFusionError::Execution(format!( - "Failed to decode batch: invalid compression codec: {other:?}" - ))), + let mut write_timer = metrics.write_time.timer(); + + let mut offsets = vec![0; num_output_partitions + 1]; + let mut spill_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(path) + .map_err(|e| DataFusionError::Execution(format!("Error occurred while spilling {}", e)))?; + + for i in 0..num_output_partitions { + offsets[i] = spill_data.stream_position()?; + spill_data.write_all(&output_batches[i])?; + output_batches[i].clear(); } + write_timer.stop(); + + // add one extra offset at last to ease partition length computation + offsets[num_output_partitions] = spill_data.stream_position()?; + Ok(offsets) } /// A stream that yields no record batches which represent end of output. @@ -1867,6 +979,8 @@ fn pmod(hash: u32, n: usize) -> usize { #[cfg(test)] mod test { use super::*; + use crate::execution::shuffle::read_ipc_compressed; + use arrow_schema::{DataType, Field, Schema}; use datafusion::physical_plan::common::collect; use datafusion::physical_plan::memory::MemoryExec; use datafusion::prelude::SessionContext; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org