This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 599a223  chore: refactor to typed Column Writers (#440)
599a223 is described below

commit 599a2235837495f8002eb8dab10ec33917f62304
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Mar 11 13:23:26 2026 +0000

    chore: refactor to typed Column Writers (#440)
---
 crates/fluss/src/record/arrow.rs      | 169 ++------
 crates/fluss/src/row/column_writer.rs | 771 ++++++++++++++++++++++++++++++++++
 crates/fluss/src/row/datum.rs         |  79 ++--
 crates/fluss/src/row/mod.rs           |   3 +-
 4 files changed, 848 insertions(+), 174 deletions(-)

diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 7fd1619..a0dfc84 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -20,16 +20,9 @@ use crate::compression::ArrowCompressionInfo;
 use crate::error::{Error, Result};
 use crate::metadata::{DataType, RowType};
 use crate::record::{ChangeType, ScanRecord};
-use crate::row::field_getter::FieldGetter;
+use crate::row::column_writer::ColumnWriter;
 use crate::row::{ColumnarRow, InternalRow};
-use arrow::array::{
-    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
-    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
-    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
-    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
-    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder, UInt8Builder,
-    UInt16Builder, UInt32Builder, UInt64Builder,
-};
+use arrow::array::{ArrayBuilder, ArrayRef};
 use arrow::{
     array::RecordBatch,
     buffer::Buffer,
@@ -154,10 +147,6 @@ pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
 // TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead 
of a hard record cap.
 pub const DEFAULT_MAX_RECORD: i32 = 256;
 
-/// Estimated average byte size for variable-width columns (Utf8, Binary).
-/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
-const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
-
 pub struct MemoryLogRecordsArrowBuilder {
     base_log_offset: i64,
     schema_id: i32,
@@ -234,8 +223,7 @@ impl ArrowRecordBatchInnerBuilder for 
PrebuiltRecordBatchBuilder {
 
 pub struct RowAppendRecordBatchBuilder {
     table_schema: SchemaRef,
-    arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
-    field_getters: Box<[FieldGetter]>,
+    column_writers: Vec<ColumnWriter>,
     records_count: i32,
 }
 
@@ -243,117 +231,31 @@ impl RowAppendRecordBatchBuilder {
     pub fn new(row_type: &RowType) -> Result<Self> {
         let capacity = DEFAULT_MAX_RECORD as usize;
         let schema_ref = to_arrow_schema(row_type)?;
-        let builders: Result<Vec<_>> = schema_ref
+        let writers: Result<Vec<_>> = row_type
             .fields()
             .iter()
-            .map(|field| Self::create_builder(field.data_type(), capacity))
+            .enumerate()
+            .map(|(pos, field)| {
+                let arrow_type = schema_ref.field(pos).data_type();
+                ColumnWriter::create(field.data_type(), arrow_type, pos, 
capacity)
+            })
             .collect();
-        let field_getters = FieldGetter::create_field_getters(row_type);
         Ok(Self {
             table_schema: schema_ref.clone(),
-            arrow_column_builders: builders?,
-            field_getters,
+            column_writers: writers?,
             records_count: 0,
         })
     }
-
-    fn create_builder(
-        data_type: &arrow_schema::DataType,
-        capacity: usize,
-    ) -> Result<Box<dyn ArrayBuilder>> {
-        match data_type {
-            arrow_schema::DataType::Int8 => 
Ok(Box::new(Int8Builder::with_capacity(capacity))),
-            arrow_schema::DataType::Int16 => 
Ok(Box::new(Int16Builder::with_capacity(capacity))),
-            arrow_schema::DataType::Int32 => 
Ok(Box::new(Int32Builder::with_capacity(capacity))),
-            arrow_schema::DataType::Int64 => 
Ok(Box::new(Int64Builder::with_capacity(capacity))),
-            arrow_schema::DataType::UInt8 => 
Ok(Box::new(UInt8Builder::with_capacity(capacity))),
-            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::with_capacity(capacity))),
-            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::with_capacity(capacity))),
-            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::with_capacity(capacity))),
-            arrow_schema::DataType::Float32 => {
-                Ok(Box::new(Float32Builder::with_capacity(capacity)))
-            }
-            arrow_schema::DataType::Float64 => {
-                Ok(Box::new(Float64Builder::with_capacity(capacity)))
-            }
-            arrow_schema::DataType::Boolean => {
-                Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
-            }
-            arrow_schema::DataType::Utf8 => 
Ok(Box::new(StringBuilder::with_capacity(
-                capacity,
-                capacity * VARIABLE_WIDTH_AVG_BYTES,
-            ))),
-            arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::with_capacity(
-                capacity,
-                capacity * VARIABLE_WIDTH_AVG_BYTES,
-            ))),
-            arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new(
-                FixedSizeBinaryBuilder::with_capacity(capacity, *size),
-            )),
-            arrow_schema::DataType::Decimal128(precision, scale) => {
-                let builder = Decimal128Builder::with_capacity(capacity)
-                    .with_precision_and_scale(*precision, *scale)
-                    .map_err(|e| Error::IllegalArgument {
-                        message: format!(
-                            "Invalid decimal precision {precision} or scale 
{scale}: {e}"
-                        ),
-                    })?;
-                Ok(Box::new(builder))
-            }
-            arrow_schema::DataType::Date32 => 
Ok(Box::new(Date32Builder::with_capacity(capacity))),
-            arrow_schema::DataType::Time32(unit) => match unit {
-                arrow_schema::TimeUnit::Second => {
-                    Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
-                }
-                arrow_schema::TimeUnit::Millisecond => {
-                    
Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
-                }
-                _ => Err(Error::IllegalArgument {
-                    message: format!(
-                        "Time32 only supports Second and Millisecond units, 
got: {unit:?}"
-                    ),
-                }),
-            },
-            arrow_schema::DataType::Time64(unit) => match unit {
-                arrow_schema::TimeUnit::Microsecond => {
-                    
Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
-                }
-                arrow_schema::TimeUnit::Nanosecond => {
-                    
Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
-                }
-                _ => Err(Error::IllegalArgument {
-                    message: format!(
-                        "Time64 only supports Microsecond and Nanosecond 
units, got: {unit:?}"
-                    ),
-                }),
-            },
-            arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
-                Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
-            }
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
-                Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
-            ),
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok(
-                Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)),
-            ),
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok(
-                Box::new(TimestampNanosecondBuilder::with_capacity(capacity)),
-            ),
-            dt => Err(Error::IllegalArgument {
-                message: format!("Unsupported data type: {dt:?}"),
-            }),
-        }
-    }
 }
 
 impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
     fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
         let arrays: Result<Vec<ArrayRef>> = self
-            .arrow_column_builders
+            .column_writers
             .iter_mut()
             .enumerate()
-            .map(|(idx, b)| {
-                let array = b.finish();
+            .map(|(idx, writer)| {
+                let array = writer.finish();
                 let expected_type = self.table_schema.field(idx).data_type();
 
                 // Validate array type matches schema
@@ -379,17 +281,8 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
     }
 
     fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
-        for (idx, getter) in self.field_getters.iter().enumerate() {
-            let datum = getter.get_field(row)?;
-            let field_type = self.table_schema.field(idx).data_type();
-            let builder =
-                self.arrow_column_builders
-                    .get_mut(idx)
-                    .ok_or_else(|| Error::UnexpectedError {
-                        message: format!("Column builder at index {idx} not 
found."),
-                        source: None,
-                    })?;
-            datum.append_to(builder, field_type)?;
+        for writer in &mut self.column_writers {
+            writer.write_field(row)?;
         }
         self.records_count += 1;
         Ok(true)
@@ -415,9 +308,9 @@ impl ArrowRecordBatchInnerBuilder for 
RowAppendRecordBatchBuilder {
         // Returns the uncompressed Arrow array memory size (same as Java's 
arrowWriter.estimatedSizeInBytes()).
         // Note: This is the size before compression. After build(), the 
actual size may be smaller
         // if compression is enabled.
-        self.arrow_column_builders
+        self.column_writers
             .iter()
-            .map(|builder| builder.finish_cloned().get_array_memory_size())
+            .map(|writer| writer.finish_cloned().get_array_memory_size())
             .sum()
     }
 }
@@ -1722,23 +1615,27 @@ mod tests {
 
     #[test]
     fn test_temporal_and_decimal_builder_validation() {
+        use crate::row::column_writer::ColumnWriter;
         use arrow::array::Array;
 
         // Test valid builder creation with precision=10, scale=2
-        let mut builder =
-            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2), 
256)
-                .unwrap();
-        let decimal_builder = builder
-            .as_any_mut()
-            .downcast_mut::<Decimal128Builder>()
-            .expect("Expected Decimal128Builder");
-        // Verify precision and scale
-        let array = decimal_builder.finish();
+        let mut writer = ColumnWriter::create(
+            &DataTypes::decimal(10, 2),
+            &ArrowDataType::Decimal128(10, 2),
+            0,
+            256,
+        )
+        .unwrap();
+        let array = writer.finish();
         assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
 
-        // Test error case: invalid precision/scale
-        let result =
-            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50), 256);
+        // Test error case: invalid Arrow precision/scale (exceeds Arrow's 
limit)
+        let result = ColumnWriter::create(
+            &DataTypes::decimal(10, 2),
+            &ArrowDataType::Decimal128(100, 50),
+            0,
+            256,
+        );
         assert!(result.is_err());
     }
 
diff --git a/crates/fluss/src/row/column_writer.rs 
b/crates/fluss/src/row/column_writer.rs
new file mode 100644
index 0000000..34dd0f5
--- /dev/null
+++ b/crates/fluss/src/row/column_writer.rs
@@ -0,0 +1,771 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed column writers that write directly from [`InternalRow`] to concrete
+//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime
+//! `downcast_mut` dispatch.
+
+use crate::error::Error::RowConvertError;
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::InternalRow;
+use crate::row::datum::{
+    MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI, 
append_decimal_to_builder,
+    millis_nanos_to_micros, millis_nanos_to_nanos,
+};
+use arrow::array::{
+    ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder, 
Decimal128Builder,
+    FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder, 
Int16Builder,
+    Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder, 
Time32SecondBuilder,
+    Time64MicrosecondBuilder, Time64NanosecondBuilder, 
TimestampMicrosecondBuilder,
+    TimestampMillisecondBuilder, TimestampNanosecondBuilder, 
TimestampSecondBuilder,
+};
+use arrow_schema::DataType as ArrowDataType;
+
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
+/// A typed column writer that reads one column from an [`InternalRow`] and
+/// appends directly to a concrete Arrow builder — no intermediate [`Datum`],
+/// no `as_any_mut().downcast_mut()`.
+pub struct ColumnWriter {
+    pos: usize,
+    nullable: bool,
+    inner: TypedWriter,
+}
+
+enum TypedWriter {
+    Bool(BooleanBuilder),
+    Int8(Int8Builder),
+    Int16(Int16Builder),
+    Int32(Int32Builder),
+    Int64(Int64Builder),
+    Float32(Float32Builder),
+    Float64(Float64Builder),
+    Char {
+        len: usize,
+        builder: StringBuilder,
+    },
+    String(StringBuilder),
+    Bytes(BinaryBuilder),
+    Binary {
+        len: usize,
+        builder: FixedSizeBinaryBuilder,
+    },
+    Decimal128 {
+        src_precision: usize,
+        src_scale: usize,
+        target_precision: u32,
+        target_scale: i64,
+        builder: Decimal128Builder,
+    },
+    Date32(Date32Builder),
+    Time32Second(Time32SecondBuilder),
+    Time32Millisecond(Time32MillisecondBuilder),
+    Time64Microsecond(Time64MicrosecondBuilder),
+    Time64Nanosecond(Time64NanosecondBuilder),
+    TimestampNtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampNtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampNtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampNtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+    TimestampLtzSecond {
+        precision: u32,
+        builder: TimestampSecondBuilder,
+    },
+    TimestampLtzMillisecond {
+        precision: u32,
+        builder: TimestampMillisecondBuilder,
+    },
+    TimestampLtzMicrosecond {
+        precision: u32,
+        builder: TimestampMicrosecondBuilder,
+    },
+    TimestampLtzNanosecond {
+        precision: u32,
+        builder: TimestampNanosecondBuilder,
+    },
+}
+
+/// Dispatch to the inner builder across all `TypedWriter` variants.
+/// Exhaustive matching ensures new variants won't compile without an arm.
+macro_rules! with_builder {
+    ($self:expr, $b:ident => $body:expr) => {
+        match $self {
+            TypedWriter::Bool($b) => $body,
+            TypedWriter::Int8($b) => $body,
+            TypedWriter::Int16($b) => $body,
+            TypedWriter::Int32($b) => $body,
+            TypedWriter::Int64($b) => $body,
+            TypedWriter::Float32($b) => $body,
+            TypedWriter::Float64($b) => $body,
+            TypedWriter::Char { builder: $b, .. } => $body,
+            TypedWriter::String($b) => $body,
+            TypedWriter::Bytes($b) => $body,
+            TypedWriter::Binary { builder: $b, .. } => $body,
+            TypedWriter::Decimal128 { builder: $b, .. } => $body,
+            TypedWriter::Date32($b) => $body,
+            TypedWriter::Time32Second($b) => $body,
+            TypedWriter::Time32Millisecond($b) => $body,
+            TypedWriter::Time64Microsecond($b) => $body,
+            TypedWriter::Time64Nanosecond($b) => $body,
+            TypedWriter::TimestampNtzSecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampNtzMillisecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampNtzMicrosecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampNtzNanosecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampLtzSecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampLtzMillisecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampLtzMicrosecond { builder: $b, .. } => $body,
+            TypedWriter::TimestampLtzNanosecond { builder: $b, .. } => $body,
+        }
+    };
+}
+
+impl ColumnWriter {
+    /// Create a column writer for the given Fluss `DataType` and Arrow
+    /// `ArrowDataType` at position `pos` with the given pre-allocation
+    /// `capacity`.
+    pub fn create(
+        fluss_type: &DataType,
+        arrow_type: &ArrowDataType,
+        pos: usize,
+        capacity: usize,
+    ) -> Result<Self> {
+        let nullable = fluss_type.is_nullable();
+
+        let inner = match fluss_type {
+            DataType::Boolean(_) => 
TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)),
+            DataType::TinyInt(_) => 
TypedWriter::Int8(Int8Builder::with_capacity(capacity)),
+            DataType::SmallInt(_) => 
TypedWriter::Int16(Int16Builder::with_capacity(capacity)),
+            DataType::Int(_) => 
TypedWriter::Int32(Int32Builder::with_capacity(capacity)),
+            DataType::BigInt(_) => 
TypedWriter::Int64(Int64Builder::with_capacity(capacity)),
+            DataType::Float(_) => 
TypedWriter::Float32(Float32Builder::with_capacity(capacity)),
+            DataType::Double(_) => 
TypedWriter::Float64(Float64Builder::with_capacity(capacity)),
+            DataType::Char(t) => TypedWriter::Char {
+                len: t.length() as usize,
+                builder: StringBuilder::with_capacity(
+                    capacity,
+                    capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+                ),
+            },
+            DataType::String(_) => 
TypedWriter::String(StringBuilder::with_capacity(
+                capacity,
+                capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+            )),
+            DataType::Bytes(_) => 
TypedWriter::Bytes(BinaryBuilder::with_capacity(
+                capacity,
+                capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+            )),
+            DataType::Binary(t) => {
+                let arrow_len: i32 = t.length().try_into().map_err(|_| 
Error::IllegalArgument {
+                    message: format!(
+                        "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+                        t.length()
+                    ),
+                })?;
+                TypedWriter::Binary {
+                    len: t.length(),
+                    builder: FixedSizeBinaryBuilder::with_capacity(capacity, 
arrow_len),
+                }
+            }
+            DataType::Decimal(dt) => {
+                let (target_p, target_s) = match arrow_type {
+                    ArrowDataType::Decimal128(p, s) => (*p, *s),
+                    _ => {
+                        return Err(Error::IllegalArgument {
+                            message: format!(
+                                "Expected Decimal128 Arrow type for Decimal, 
got: {arrow_type:?}"
+                            ),
+                        });
+                    }
+                };
+                if target_s < 0 {
+                    return Err(Error::IllegalArgument {
+                        message: format!("Negative decimal scale {target_s} is 
not supported"),
+                    });
+                }
+                let builder = Decimal128Builder::with_capacity(capacity)
+                    .with_precision_and_scale(target_p, target_s)
+                    .map_err(|e| Error::IllegalArgument {
+                        message: format!(
+                            "Invalid decimal precision {target_p} or scale 
{target_s}: {e}"
+                        ),
+                    })?;
+                TypedWriter::Decimal128 {
+                    src_precision: dt.precision() as usize,
+                    src_scale: dt.scale() as usize,
+                    target_precision: target_p as u32,
+                    target_scale: target_s as i64,
+                    builder,
+                }
+            }
+            DataType::Date(_) => 
TypedWriter::Date32(Date32Builder::with_capacity(capacity)),
+            DataType::Time(_) => match arrow_type {
+                ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => {
+                    
TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity))
+                }
+                ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+                    
TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity(
+                        capacity,
+                    ))
+                }
+                ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+                    
TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(
+                        capacity,
+                    ))
+                }
+                ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+                    
TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity))
+                }
+                _ => {
+                    return Err(Error::IllegalArgument {
+                        message: format!("Unsupported Arrow type for Time: 
{arrow_type:?}"),
+                    });
+                }
+            },
+            DataType::Timestamp(t) => {
+                let precision = t.precision();
+                match arrow_type {
+                    ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
+                        TypedWriter::TimestampNtzSecond {
+                            precision,
+                            builder: 
TimestampSecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+                        TypedWriter::TimestampNtzMillisecond {
+                            precision,
+                            builder: 
TimestampMillisecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+                        TypedWriter::TimestampNtzMicrosecond {
+                            precision,
+                            builder: 
TimestampMicrosecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+                        TypedWriter::TimestampNtzNanosecond {
+                            precision,
+                            builder: 
TimestampNanosecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    _ => {
+                        return Err(Error::IllegalArgument {
+                            message: format!(
+                                "Unsupported Arrow type for Timestamp: 
{arrow_type:?}"
+                            ),
+                        });
+                    }
+                }
+            }
+            DataType::TimestampLTz(t) => {
+                let precision = t.precision();
+                match arrow_type {
+                    ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
+                        TypedWriter::TimestampLtzSecond {
+                            precision,
+                            builder: 
TimestampSecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+                        TypedWriter::TimestampLtzMillisecond {
+                            precision,
+                            builder: 
TimestampMillisecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+                        TypedWriter::TimestampLtzMicrosecond {
+                            precision,
+                            builder: 
TimestampMicrosecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+                        TypedWriter::TimestampLtzNanosecond {
+                            precision,
+                            builder: 
TimestampNanosecondBuilder::with_capacity(capacity),
+                        }
+                    }
+                    _ => {
+                        return Err(Error::IllegalArgument {
+                            message: format!(
+                                "Unsupported Arrow type for TimestampLTz: 
{arrow_type:?}"
+                            ),
+                        });
+                    }
+                }
+            }
+            _ => {
+                return Err(Error::IllegalArgument {
+                    message: format!("Unsupported Fluss DataType: 
{fluss_type:?}"),
+                });
+            }
+        };
+
+        Ok(Self {
+            pos,
+            nullable,
+            inner,
+        })
+    }
+
+    /// Read one value from `row` at this writer's column position and append 
it
+    /// directly to the concrete Arrow builder.
+    #[inline]
+    pub fn write_field(&mut self, row: &dyn InternalRow) -> Result<()> {
+        if self.nullable && row.is_null_at(self.pos)? {
+            self.append_null();
+            return Ok(());
+        }
+        self.write_non_null(row)
+    }
+
+    /// Finish the builder, producing the final Arrow array.
+    pub fn finish(&mut self) -> ArrayRef {
+        self.as_builder_mut().finish()
+    }
+
+    /// Clone-finish the builder for size estimation (does not reset the 
builder).
+    pub fn finish_cloned(&self) -> ArrayRef {
+        self.as_builder_ref().finish_cloned()
+    }
+
+    fn append_null(&mut self) {
+        with_builder!(&mut self.inner, b => b.append_null());
+    }
+
+    /// Returns a trait-object reference to the inner builder.
+    /// Used for type-agnostic operations (`finish`, `finish_cloned`).
+    fn as_builder_mut(&mut self) -> &mut dyn ArrayBuilder {
+        with_builder!(&mut self.inner, b => b)
+    }
+
+    fn as_builder_ref(&self) -> &dyn ArrayBuilder {
+        with_builder!(&self.inner, b => b)
+    }
+
+    #[inline]
+    fn write_non_null(&mut self, row: &dyn InternalRow) -> Result<()> {
+        let pos = self.pos;
+
+        match &mut self.inner {
+            TypedWriter::Bool(b) => {
+                b.append_value(row.get_boolean(pos)?);
+                Ok(())
+            }
+            TypedWriter::Int8(b) => {
+                b.append_value(row.get_byte(pos)?);
+                Ok(())
+            }
+            TypedWriter::Int16(b) => {
+                b.append_value(row.get_short(pos)?);
+                Ok(())
+            }
+            TypedWriter::Int32(b) => {
+                b.append_value(row.get_int(pos)?);
+                Ok(())
+            }
+            TypedWriter::Int64(b) => {
+                b.append_value(row.get_long(pos)?);
+                Ok(())
+            }
+            TypedWriter::Float32(b) => {
+                b.append_value(row.get_float(pos)?);
+                Ok(())
+            }
+            TypedWriter::Float64(b) => {
+                b.append_value(row.get_double(pos)?);
+                Ok(())
+            }
+            TypedWriter::Char { len, builder } => {
+                let v = row.get_char(pos, *len)?;
+                builder.append_value(v);
+                Ok(())
+            }
+            TypedWriter::String(b) => {
+                let v = row.get_string(pos)?;
+                b.append_value(v);
+                Ok(())
+            }
+            TypedWriter::Bytes(b) => {
+                let v = row.get_bytes(pos)?;
+                b.append_value(v);
+                Ok(())
+            }
+            TypedWriter::Binary { len, builder } => {
+                let v = row.get_binary(pos, *len)?;
+                builder.append_value(v).map_err(|e| RowConvertError {
+                    message: format!("Failed to append binary value: {e}"),
+                })?;
+                Ok(())
+            }
+            TypedWriter::Decimal128 {
+                src_precision,
+                src_scale,
+                target_precision,
+                target_scale,
+                builder,
+            } => {
+                let decimal = row.get_decimal(pos, *src_precision, 
*src_scale)?;
+                append_decimal_to_builder(&decimal, *target_precision, 
*target_scale, builder)
+            }
+            TypedWriter::Date32(b) => {
+                let date = row.get_date(pos)?;
+                b.append_value(date.get_inner());
+                Ok(())
+            }
+            TypedWriter::Time32Second(b) => {
+                let millis = row.get_time(pos)?.get_inner();
+                if millis % MILLIS_PER_SECOND as i32 != 0 {
+                    return Err(RowConvertError {
+                        message: format!(
+                            "Time value {millis} ms has sub-second precision 
but schema expects seconds only"
+                        ),
+                    });
+                }
+                b.append_value(millis / MILLIS_PER_SECOND as i32);
+                Ok(())
+            }
+            TypedWriter::Time32Millisecond(b) => {
+                b.append_value(row.get_time(pos)?.get_inner());
+                Ok(())
+            }
+            TypedWriter::Time64Microsecond(b) => {
+                let millis = row.get_time(pos)?.get_inner();
+                let micros = (millis as i64)
+                    .checked_mul(MICROS_PER_MILLI)
+                    .ok_or_else(|| RowConvertError {
+                        message: format!(
+                            "Time value {millis} ms overflows when converting 
to microseconds"
+                        ),
+                    })?;
+                b.append_value(micros);
+                Ok(())
+            }
+            TypedWriter::Time64Nanosecond(b) => {
+                let millis = row.get_time(pos)?.get_inner();
+                let nanos = (millis as i64)
+                    .checked_mul(NANOS_PER_MILLI)
+                    .ok_or_else(|| RowConvertError {
+                        message: format!(
+                            "Time value {millis} ms overflows when converting 
to nanoseconds"
+                        ),
+                    })?;
+                b.append_value(nanos);
+                Ok(())
+            }
+            // --- TimestampNtz variants ---
+            TypedWriter::TimestampNtzSecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ntz(pos, *precision)?;
+                builder.append_value(ts.get_millisecond() / MILLIS_PER_SECOND);
+                Ok(())
+            }
+            TypedWriter::TimestampNtzMillisecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ntz(pos, *precision)?;
+                builder.append_value(ts.get_millisecond());
+                Ok(())
+            }
+            TypedWriter::TimestampNtzMicrosecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ntz(pos, *precision)?;
+                builder.append_value(millis_nanos_to_micros(
+                    ts.get_millisecond(),
+                    ts.get_nano_of_millisecond(),
+                )?);
+                Ok(())
+            }
+            TypedWriter::TimestampNtzNanosecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ntz(pos, *precision)?;
+                builder.append_value(millis_nanos_to_nanos(
+                    ts.get_millisecond(),
+                    ts.get_nano_of_millisecond(),
+                )?);
+                Ok(())
+            }
+            // --- TimestampLtz variants ---
+            TypedWriter::TimestampLtzSecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ltz(pos, *precision)?;
+                builder.append_value(ts.get_epoch_millisecond() / 
MILLIS_PER_SECOND);
+                Ok(())
+            }
+            TypedWriter::TimestampLtzMillisecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ltz(pos, *precision)?;
+                builder.append_value(ts.get_epoch_millisecond());
+                Ok(())
+            }
+            TypedWriter::TimestampLtzMicrosecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ltz(pos, *precision)?;
+                builder.append_value(millis_nanos_to_micros(
+                    ts.get_epoch_millisecond(),
+                    ts.get_nano_of_millisecond(),
+                )?);
+                Ok(())
+            }
+            TypedWriter::TimestampLtzNanosecond {
+                precision, builder, ..
+            } => {
+                let ts = row.get_timestamp_ltz(pos, *precision)?;
+                builder.append_value(millis_nanos_to_nanos(
+                    ts.get_epoch_millisecond(),
+                    ts.get_nano_of_millisecond(),
+                )?);
+                Ok(())
+            }
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::metadata::DataTypes;
+    use crate::record::to_arrow_type;
+    use crate::row::{Date, Datum, GenericRow, Time, TimestampLtz, 
TimestampNtz};
+    use arrow::array::*;
+    use bigdecimal::BigDecimal;
+    use std::str::FromStr;
+
+    /// Helper: create a ColumnWriter from a Fluss DataType, deriving the 
Arrow type automatically.
+    fn writer_for(fluss_type: &DataType, capacity: usize) -> ColumnWriter {
+        let arrow_type = to_arrow_type(fluss_type).unwrap();
+        ColumnWriter::create(fluss_type, &arrow_type, 0, capacity).unwrap()
+    }
+
+    /// Helper: write a single datum and return the finished array.
+    fn write_one(fluss_type: &DataType, datum: Datum) -> ArrayRef {
+        let mut w = writer_for(fluss_type, 4);
+        w.write_field(&GenericRow::from_data(vec![datum])).unwrap();
+        w.finish()
+    }
+
+    #[test]
+    fn write_all_scalar_types() {
+        // Boolean
+        let arr = write_one(&DataTypes::boolean(), Datum::Bool(true));
+        assert!(
+            arr.as_any()
+                .downcast_ref::<BooleanArray>()
+                .unwrap()
+                .value(0)
+        );
+
+        // Integer types
+        let arr = write_one(&DataTypes::tinyint(), Datum::Int8(42));
+        assert_eq!(
+            arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0),
+            42
+        );
+
+        let arr = write_one(&DataTypes::smallint(), Datum::Int16(1000));
+        assert_eq!(
+            arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0),
+            1000
+        );
+
+        let arr = write_one(&DataTypes::int(), Datum::Int32(100_000));
+        assert_eq!(
+            arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
+            100_000
+        );
+
+        let arr = write_one(&DataTypes::bigint(), Datum::Int64(9_000_000_000));
+        assert_eq!(
+            arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0),
+            9_000_000_000
+        );
+
+        // Float types
+        let arr = write_one(&DataTypes::float(), Datum::Float32(1.5.into()));
+        assert!(
+            (arr.as_any()
+                .downcast_ref::<Float32Array>()
+                .unwrap()
+                .value(0)
+                - 1.5)
+                .abs()
+                < 0.001
+        );
+
+        let arr = write_one(&DataTypes::double(), 
Datum::Float64(1.125.into()));
+        assert!(
+            (arr.as_any()
+                .downcast_ref::<Float64Array>()
+                .unwrap()
+                .value(0)
+                - 1.125)
+                .abs()
+                < 0.001
+        );
+
+        // String / Char
+        let arr = write_one(&DataTypes::string(), 
Datum::String("hello".into()));
+        assert_eq!(
+            arr.as_any().downcast_ref::<StringArray>().unwrap().value(0),
+            "hello"
+        );
+
+        let arr = write_one(&DataTypes::char(10), 
Datum::String("world".into()));
+        assert_eq!(
+            arr.as_any().downcast_ref::<StringArray>().unwrap().value(0),
+            "world"
+        );
+
+        // Bytes / Binary
+        let arr = write_one(&DataTypes::bytes(), Datum::Blob(vec![1, 2, 
3].into()));
+        assert_eq!(
+            arr.as_any().downcast_ref::<BinaryArray>().unwrap().value(0),
+            &[1, 2, 3]
+        );
+
+        let arr = write_one(
+            &DataTypes::binary(4),
+            Datum::Blob(vec![10, 20, 30, 40].into()),
+        );
+        assert_eq!(
+            arr.as_any()
+                .downcast_ref::<FixedSizeBinaryArray>()
+                .unwrap()
+                .value(0),
+            &[10, 20, 30, 40]
+        );
+
+        // Date
+        let arr = write_one(&DataTypes::date(), Datum::Date(Date::new(19000)));
+        assert_eq!(
+            arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0),
+            19000
+        );
+
+        // Time (precision 3 → Millisecond)
+        let arr = write_one(
+            &DataTypes::time_with_precision(3),
+            Datum::Time(Time::new(45_000)),
+        );
+        assert_eq!(
+            arr.as_any()
+                .downcast_ref::<Time32MillisecondArray>()
+                .unwrap()
+                .value(0),
+            45_000
+        );
+
+        // Decimal
+        let decimal =
+            
crate::row::Decimal::from_big_decimal(BigDecimal::from_str("123.45").unwrap(), 
10, 2)
+                .unwrap();
+        let arr = write_one(&DataTypes::decimal(10, 2), 
Datum::Decimal(decimal));
+        assert_eq!(
+            arr.as_any()
+                .downcast_ref::<Decimal128Array>()
+                .unwrap()
+                .value(0),
+            12345
+        );
+
+        // Timestamp NTZ (precision 3 → Millisecond)
+        let arr = write_one(
+            &DataTypes::timestamp_with_precision(3),
+            Datum::TimestampNtz(TimestampNtz::new(1_700_000_000_000)),
+        );
+        assert_eq!(
+            arr.as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap()
+                .value(0),
+            1_700_000_000_000
+        );
+
+        // Timestamp LTZ (precision 3 → Millisecond)
+        let arr = write_one(
+            &DataTypes::timestamp_ltz_with_precision(3),
+            Datum::TimestampLtz(TimestampLtz::new(1_700_000_000_000)),
+        );
+        assert_eq!(
+            arr.as_any()
+                .downcast_ref::<TimestampMillisecondArray>()
+                .unwrap()
+                .value(0),
+            1_700_000_000_000
+        );
+    }
+
+    #[test]
+    fn write_null_and_multiple_rows() {
+        // Null
+        let arr = write_one(&DataTypes::int(), Datum::Null);
+        assert!(arr.is_null(0));
+
+        // Multiple rows
+        let mut w = writer_for(&DataTypes::int(), 8);
+        for val in [10, 20, 30] {
+            w.write_field(&GenericRow::from_data(vec![val])).unwrap();
+        }
+        let arr = w.finish();
+        let int_arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+        assert_eq!(int_arr.len(), 3);
+        assert_eq!(int_arr.value(0), 10);
+        assert_eq!(int_arr.value(1), 20);
+        assert_eq!(int_arr.value(2), 30);
+
+        // finish_cloned does not reset
+        let mut w = writer_for(&DataTypes::int(), 4);
+        w.write_field(&GenericRow::from_data(vec![42_i32])).unwrap();
+        assert_eq!(w.finish_cloned().len(), 1);
+        w.write_field(&GenericRow::from_data(vec![99_i32])).unwrap();
+        let int_arr = w
+            .finish()
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap()
+            .clone();
+        assert_eq!((int_arr.value(0), int_arr.value(1)), (42, 99));
+    }
+
+    #[test]
+    fn unsupported_type_returns_error() {
+        let fluss_type = DataTypes::array(DataTypes::int());
+        let arrow_type = ArrowDataType::List(arrow_schema::FieldRef::new(
+            arrow_schema::Field::new("item", ArrowDataType::Int32, true),
+        ));
+        assert!(ColumnWriter::create(&fluss_type, &arrow_type, 0, 4).is_err());
+    }
+}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index b370fb1..9b2e80a 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -397,13 +397,13 @@ pub trait ToArrow {
 }
 
 // Time unit conversion constants
-const MILLIS_PER_SECOND: i64 = 1_000;
-const MICROS_PER_MILLI: i64 = 1_000;
-const NANOS_PER_MILLI: i64 = 1_000_000;
+pub(crate) const MILLIS_PER_SECOND: i64 = 1_000;
+pub(crate) const MICROS_PER_MILLI: i64 = 1_000;
+pub(crate) const NANOS_PER_MILLI: i64 = 1_000_000;
 
 /// Converts milliseconds and nanoseconds-within-millisecond to total 
microseconds.
 /// Returns an error if the conversion would overflow.
-fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
+pub(crate) fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
     let millis_micros = millis
         .checked_mul(MICROS_PER_MILLI)
         .ok_or_else(|| RowConvertError {
@@ -423,7 +423,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) -> 
Result<i64> {
 
 /// Converts milliseconds and nanoseconds-within-millisecond to total 
nanoseconds.
 /// Returns an error if the conversion would overflow.
-fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
+pub(crate) fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
     let millis_nanos = millis
         .checked_mul(NANOS_PER_MILLI)
         .ok_or_else(|| RowConvertError {
@@ -440,6 +440,42 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> 
Result<i64> {
         })
 }
 
+/// Rescales a [`Decimal`] to the given Arrow target precision/scale and 
appends
+/// the resulting i128 to the builder.
+pub(crate) fn append_decimal_to_builder(
+    decimal: &Decimal,
+    target_precision: u32,
+    target_scale: i64,
+    builder: &mut Decimal128Builder,
+) -> Result<()> {
+    use bigdecimal::RoundingMode;
+
+    let bd = decimal.to_big_decimal();
+    let rescaled = bd.with_scale_round(target_scale, RoundingMode::HalfUp);
+    let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+    let actual_precision = Decimal::compute_precision(&unscaled);
+    if actual_precision > target_precision as usize {
+        return Err(RowConvertError {
+            message: format!(
+                "Decimal precision overflow: value has {actual_precision} 
digits but Arrow expects {target_precision} (value: {rescaled})"
+            ),
+        });
+    }
+
+    let i128_val: i128 = match unscaled.try_into() {
+        Ok(v) => v,
+        Err(_) => {
+            return Err(RowConvertError {
+                message: format!("Decimal value exceeds i128 range: 
{rescaled}"),
+            });
+        }
+    };
+
+    builder.append_value(i128_val);
+    Ok(())
+}
+
 trait AppendResult {
     fn into_append_result(self) -> Result<()>;
 }
@@ -539,45 +575,14 @@ impl Datum<'_> {
                     }
                 };
 
-                // Validate scale is non-negative (Fluss doesn't support 
negative scales)
                 if s < 0 {
                     return Err(RowConvertError {
                         message: format!("Negative decimal scale {s} is not 
supported"),
                     });
                 }
 
-                let target_precision = p as u32;
-                let target_scale = s as i64; // Safe now: 0..127 → 0i64..127i64
-
                 if let Some(b) = 
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
-                    use bigdecimal::RoundingMode;
-
-                    // Rescale the decimal to match Arrow's target scale
-                    let bd = decimal.to_big_decimal();
-                    let rescaled = bd.with_scale_round(target_scale, 
RoundingMode::HalfUp);
-                    let (unscaled, _) = rescaled.as_bigint_and_exponent();
-
-                    // Validate precision
-                    let actual_precision = 
Decimal::compute_precision(&unscaled);
-                    if actual_precision > target_precision as usize {
-                        return Err(RowConvertError {
-                            message: format!(
-                                "Decimal precision overflow: value has 
{actual_precision} digits but Arrow expects {target_precision} (value: 
{rescaled})"
-                            ),
-                        });
-                    }
-
-                    // Convert to i128 for Arrow
-                    let i128_val: i128 = match unscaled.try_into() {
-                        Ok(v) => v,
-                        Err(_) => {
-                            return Err(RowConvertError {
-                                message: format!("Decimal value exceeds i128 
range: {rescaled}"),
-                            });
-                        }
-                    };
-
-                    b.append_value(i128_val);
+                    append_decimal_to_builder(decimal, p as u32, s as i64, b)?;
                     return Ok(());
                 }
 
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 8fb777d..ef99ba2 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -17,10 +17,11 @@
 
 mod column;
 
-mod datum;
+pub(crate) mod datum;
 mod decimal;
 
 pub mod binary;
+pub(crate) mod column_writer;
 pub mod compacted;
 pub mod encode;
 pub mod field_getter;


Reply via email to