This is an automated email from the ASF dual-hosted git repository.
leekei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e05a3f0 [TASK-470] Optimize null append in Arrow builder (#471)
e05a3f0 is described below
commit e05a3f038b02e2419cfc8e452162a3affc3852a7
Author: Anton Borisov <[email protected]>
AuthorDate: Mon Mar 30 22:49:58 2026 +0100
[TASK-470] Optimize null append in Arrow builder (#471)
---
crates/fluss/src/row/datum.rs | 116 +++++++++++++++++++++++++-----------------
1 file changed, 68 insertions(+), 48 deletions(-)
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index 2f1d183..78dc549 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -540,11 +540,7 @@ fn append_fluss_array_to_list_builder(
for i in 0..arr.size() {
if arr.is_null_at(i) {
- // TODO: Datum::Null triggers a chain of downcast attempts in
append_to.
- // For sparse arrays with many nulls, call append_null directly on
the
- // typed inner builder to avoid the overhead.
- let null_datum = Datum::Null;
- null_datum.append_to(values_builder, &element_arrow_type)?;
+ append_null_for_type(values_builder, &element_arrow_type)?;
} else {
let datum = read_datum_from_fluss_array(arr, i,
&element_fluss_type)?;
datum.append_to(values_builder, &element_arrow_type)?;
@@ -595,21 +591,78 @@ fn read_datum_from_fluss_array<'a>(
})
}
+fn append_null_for_type(
+ builder: &mut dyn ArrayBuilder,
+ data_type: &arrow_schema::DataType,
+) -> Result<()> {
+ macro_rules! downcast_null {
+ ($builder_type:ty) => {{
+ let b = builder
+ .as_any_mut()
+ .downcast_mut::<$builder_type>()
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Builder type mismatch: expected {} for {data_type:?}",
+ stringify!($builder_type),
+ ),
+ })?;
+ b.append_null();
+ Ok(())
+ }};
+ }
+
+ match data_type {
+ arrow_schema::DataType::Boolean => downcast_null!(BooleanBuilder),
+ arrow_schema::DataType::Int8 => downcast_null!(Int8Builder),
+ arrow_schema::DataType::Int16 => downcast_null!(Int16Builder),
+ arrow_schema::DataType::Int32 => downcast_null!(Int32Builder),
+ arrow_schema::DataType::Int64 => downcast_null!(Int64Builder),
+ arrow_schema::DataType::Float32 => downcast_null!(Float32Builder),
+ arrow_schema::DataType::Float64 => downcast_null!(Float64Builder),
+ arrow_schema::DataType::Utf8 => downcast_null!(StringBuilder),
+ arrow_schema::DataType::Binary => downcast_null!(BinaryBuilder),
+ arrow_schema::DataType::FixedSizeBinary(_) =>
downcast_null!(FixedSizeBinaryBuilder),
+ arrow_schema::DataType::Decimal128(_, _) =>
downcast_null!(Decimal128Builder),
+ arrow_schema::DataType::Date32 => downcast_null!(Date32Builder),
+ arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Second) => {
+ downcast_null!(Time32SecondBuilder)
+ }
+ arrow_schema::DataType::Time32(arrow_schema::TimeUnit::Millisecond) =>
{
+ downcast_null!(Time32MillisecondBuilder)
+ }
+ arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Microsecond) =>
{
+ downcast_null!(Time64MicrosecondBuilder)
+ }
+ arrow_schema::DataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+ downcast_null!(Time64NanosecondBuilder)
+ }
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, _)
=> {
+ downcast_null!(TimestampSecondBuilder)
+ }
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond,
_) => {
+ downcast_null!(TimestampMillisecondBuilder)
+ }
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond,
_) => {
+ downcast_null!(TimestampMicrosecondBuilder)
+ }
+ arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond,
_) => {
+ downcast_null!(TimestampNanosecondBuilder)
+ }
+ arrow_schema::DataType::List(_) => {
+ downcast_null!(ListBuilder<Box<dyn ArrayBuilder>>)
+ }
+ _ => Err(RowConvertError {
+ message: format!("Unsupported Arrow data type for null append:
{data_type:?}"),
+ }),
+ }
+}
+
impl Datum<'_> {
pub fn append_to(
&self,
builder: &mut dyn ArrayBuilder,
data_type: &arrow_schema::DataType,
) -> Result<()> {
- macro_rules! append_null_to_arrow {
- ($builder_type:ty) => {
- if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
- b.append_null();
- return Ok(());
- }
- };
- }
-
macro_rules! append_value_to_arrow {
($builder_type:ty, $value:expr) => {
if let Some(b) =
builder.as_any_mut().downcast_mut::<$builder_type>() {
@@ -620,40 +673,7 @@ impl Datum<'_> {
}
match self {
- Datum::Null => {
- append_null_to_arrow!(Int8Builder);
- append_null_to_arrow!(BooleanBuilder);
- append_null_to_arrow!(Int16Builder);
- append_null_to_arrow!(Int32Builder);
- append_null_to_arrow!(Int64Builder);
- append_null_to_arrow!(Float32Builder);
- append_null_to_arrow!(Float64Builder);
- append_null_to_arrow!(StringBuilder);
- append_null_to_arrow!(BinaryBuilder);
- append_null_to_arrow!(FixedSizeBinaryBuilder);
- append_null_to_arrow!(Decimal128Builder);
- append_null_to_arrow!(Date32Builder);
- append_null_to_arrow!(Time32SecondBuilder);
- append_null_to_arrow!(Time32MillisecondBuilder);
- append_null_to_arrow!(Time64MicrosecondBuilder);
- append_null_to_arrow!(Time64NanosecondBuilder);
- append_null_to_arrow!(TimestampSecondBuilder);
- append_null_to_arrow!(TimestampMillisecondBuilder);
- append_null_to_arrow!(TimestampMicrosecondBuilder);
- append_null_to_arrow!(TimestampNanosecondBuilder);
- if let arrow_schema::DataType::List(_) = data_type {
- let b = builder
- .as_any_mut()
- .downcast_mut::<ListBuilder<Box<dyn ArrayBuilder>>>()
- .ok_or_else(|| RowConvertError {
- message:
- "Expected ListBuilder<Box<dyn ArrayBuilder>>
for List Arrow type"
- .to_string(),
- })?;
- b.append_null();
- return Ok(());
- }
- }
+ Datum::Null => return append_null_for_type(builder, data_type),
Datum::Bool(v) => append_value_to_arrow!(BooleanBuilder, *v),
Datum::Int8(v) => append_value_to_arrow!(Int8Builder, *v),
Datum::Int16(v) => append_value_to_arrow!(Int16Builder, *v),