This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 599a223 chore: refactor to typed Column Writers (#440)
599a223 is described below
commit 599a2235837495f8002eb8dab10ec33917f62304
Author: Anton Borisov <[email protected]>
AuthorDate: Wed Mar 11 13:23:26 2026 +0000
chore: refactor to typed Column Writers (#440)
---
crates/fluss/src/record/arrow.rs | 169 ++------
crates/fluss/src/row/column_writer.rs | 771 ++++++++++++++++++++++++++++++++++
crates/fluss/src/row/datum.rs | 79 ++--
crates/fluss/src/row/mod.rs | 3 +-
4 files changed, 848 insertions(+), 174 deletions(-)
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index 7fd1619..a0dfc84 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -20,16 +20,9 @@ use crate::compression::ArrowCompressionInfo;
use crate::error::{Error, Result};
use crate::metadata::{DataType, RowType};
use crate::record::{ChangeType, ScanRecord};
-use crate::row::field_getter::FieldGetter;
+use crate::row::column_writer::ColumnWriter;
use crate::row::{ColumnarRow, InternalRow};
-use arrow::array::{
- ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
- FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
- Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder,
- Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder,
- TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder, UInt8Builder,
- UInt16Builder, UInt32Builder, UInt64Builder,
-};
+use arrow::array::{ArrayBuilder, ArrayRef};
use arrow::{
array::RecordBatch,
buffer::Buffer,
@@ -154,10 +147,6 @@ pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead
of a hard record cap.
pub const DEFAULT_MAX_RECORD: i32 = 256;
-/// Estimated average byte size for variable-width columns (Utf8, Binary).
-/// Used to pre-allocate data buffers and avoid reallocations during batch
building.
-const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
-
pub struct MemoryLogRecordsArrowBuilder {
base_log_offset: i64,
schema_id: i32,
@@ -234,8 +223,7 @@ impl ArrowRecordBatchInnerBuilder for
PrebuiltRecordBatchBuilder {
pub struct RowAppendRecordBatchBuilder {
table_schema: SchemaRef,
- arrow_column_builders: Vec<Box<dyn ArrayBuilder>>,
- field_getters: Box<[FieldGetter]>,
+ column_writers: Vec<ColumnWriter>,
records_count: i32,
}
@@ -243,117 +231,31 @@ impl RowAppendRecordBatchBuilder {
pub fn new(row_type: &RowType) -> Result<Self> {
let capacity = DEFAULT_MAX_RECORD as usize;
let schema_ref = to_arrow_schema(row_type)?;
- let builders: Result<Vec<_>> = schema_ref
+ let writers: Result<Vec<_>> = row_type
.fields()
.iter()
- .map(|field| Self::create_builder(field.data_type(), capacity))
+ .enumerate()
+ .map(|(pos, field)| {
+ let arrow_type = schema_ref.field(pos).data_type();
+ ColumnWriter::create(field.data_type(), arrow_type, pos,
capacity)
+ })
.collect();
- let field_getters = FieldGetter::create_field_getters(row_type);
Ok(Self {
table_schema: schema_ref.clone(),
- arrow_column_builders: builders?,
- field_getters,
+ column_writers: writers?,
records_count: 0,
})
}
-
- fn create_builder(
- data_type: &arrow_schema::DataType,
- capacity: usize,
- ) -> Result<Box<dyn ArrayBuilder>> {
- match data_type {
- arrow_schema::DataType::Int8 =>
Ok(Box::new(Int8Builder::with_capacity(capacity))),
- arrow_schema::DataType::Int16 =>
Ok(Box::new(Int16Builder::with_capacity(capacity))),
- arrow_schema::DataType::Int32 =>
Ok(Box::new(Int32Builder::with_capacity(capacity))),
- arrow_schema::DataType::Int64 =>
Ok(Box::new(Int64Builder::with_capacity(capacity))),
- arrow_schema::DataType::UInt8 =>
Ok(Box::new(UInt8Builder::with_capacity(capacity))),
- arrow_schema::DataType::UInt16 =>
Ok(Box::new(UInt16Builder::with_capacity(capacity))),
- arrow_schema::DataType::UInt32 =>
Ok(Box::new(UInt32Builder::with_capacity(capacity))),
- arrow_schema::DataType::UInt64 =>
Ok(Box::new(UInt64Builder::with_capacity(capacity))),
- arrow_schema::DataType::Float32 => {
- Ok(Box::new(Float32Builder::with_capacity(capacity)))
- }
- arrow_schema::DataType::Float64 => {
- Ok(Box::new(Float64Builder::with_capacity(capacity)))
- }
- arrow_schema::DataType::Boolean => {
- Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
- }
- arrow_schema::DataType::Utf8 =>
Ok(Box::new(StringBuilder::with_capacity(
- capacity,
- capacity * VARIABLE_WIDTH_AVG_BYTES,
- ))),
- arrow_schema::DataType::Binary =>
Ok(Box::new(BinaryBuilder::with_capacity(
- capacity,
- capacity * VARIABLE_WIDTH_AVG_BYTES,
- ))),
- arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new(
- FixedSizeBinaryBuilder::with_capacity(capacity, *size),
- )),
- arrow_schema::DataType::Decimal128(precision, scale) => {
- let builder = Decimal128Builder::with_capacity(capacity)
- .with_precision_and_scale(*precision, *scale)
- .map_err(|e| Error::IllegalArgument {
- message: format!(
- "Invalid decimal precision {precision} or scale
{scale}: {e}"
- ),
- })?;
- Ok(Box::new(builder))
- }
- arrow_schema::DataType::Date32 =>
Ok(Box::new(Date32Builder::with_capacity(capacity))),
- arrow_schema::DataType::Time32(unit) => match unit {
- arrow_schema::TimeUnit::Second => {
- Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
- }
- arrow_schema::TimeUnit::Millisecond => {
-
Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
- }
- _ => Err(Error::IllegalArgument {
- message: format!(
- "Time32 only supports Second and Millisecond units,
got: {unit:?}"
- ),
- }),
- },
- arrow_schema::DataType::Time64(unit) => match unit {
- arrow_schema::TimeUnit::Microsecond => {
-
Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
- }
- arrow_schema::TimeUnit::Nanosecond => {
-
Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
- }
- _ => Err(Error::IllegalArgument {
- message: format!(
- "Time64 only supports Microsecond and Nanosecond
units, got: {unit:?}"
- ),
- }),
- },
- arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second,
_) => {
- Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
- }
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
- Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
- ),
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok(
- Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)),
- ),
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok(
- Box::new(TimestampNanosecondBuilder::with_capacity(capacity)),
- ),
- dt => Err(Error::IllegalArgument {
- message: format!("Unsupported data type: {dt:?}"),
- }),
- }
- }
}
impl ArrowRecordBatchInnerBuilder for RowAppendRecordBatchBuilder {
fn build_arrow_record_batch(&mut self) -> Result<Arc<RecordBatch>> {
let arrays: Result<Vec<ArrayRef>> = self
- .arrow_column_builders
+ .column_writers
.iter_mut()
.enumerate()
- .map(|(idx, b)| {
- let array = b.finish();
+ .map(|(idx, writer)| {
+ let array = writer.finish();
let expected_type = self.table_schema.field(idx).data_type();
// Validate array type matches schema
@@ -379,17 +281,8 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
}
fn append(&mut self, row: &dyn InternalRow) -> Result<bool> {
- for (idx, getter) in self.field_getters.iter().enumerate() {
- let datum = getter.get_field(row)?;
- let field_type = self.table_schema.field(idx).data_type();
- let builder =
- self.arrow_column_builders
- .get_mut(idx)
- .ok_or_else(|| Error::UnexpectedError {
- message: format!("Column builder at index {idx} not
found."),
- source: None,
- })?;
- datum.append_to(builder, field_type)?;
+ for writer in &mut self.column_writers {
+ writer.write_field(row)?;
}
self.records_count += 1;
Ok(true)
@@ -415,9 +308,9 @@ impl ArrowRecordBatchInnerBuilder for
RowAppendRecordBatchBuilder {
// Returns the uncompressed Arrow array memory size (same as Java's
arrowWriter.estimatedSizeInBytes()).
// Note: This is the size before compression. After build(), the
actual size may be smaller
// if compression is enabled.
- self.arrow_column_builders
+ self.column_writers
.iter()
- .map(|builder| builder.finish_cloned().get_array_memory_size())
+ .map(|writer| writer.finish_cloned().get_array_memory_size())
.sum()
}
}
@@ -1722,23 +1615,27 @@ mod tests {
#[test]
fn test_temporal_and_decimal_builder_validation() {
+ use crate::row::column_writer::ColumnWriter;
use arrow::array::Array;
// Test valid builder creation with precision=10, scale=2
- let mut builder =
-
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2),
256)
- .unwrap();
- let decimal_builder = builder
- .as_any_mut()
- .downcast_mut::<Decimal128Builder>()
- .expect("Expected Decimal128Builder");
- // Verify precision and scale
- let array = decimal_builder.finish();
+ let mut writer = ColumnWriter::create(
+ &DataTypes::decimal(10, 2),
+ &ArrowDataType::Decimal128(10, 2),
+ 0,
+ 256,
+ )
+ .unwrap();
+ let array = writer.finish();
assert_eq!(array.data_type(), &ArrowDataType::Decimal128(10, 2));
- // Test error case: invalid precision/scale
- let result =
-
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100,
50), 256);
+ // Test error case: invalid Arrow precision/scale (exceeds Arrow's
limit)
+ let result = ColumnWriter::create(
+ &DataTypes::decimal(10, 2),
+ &ArrowDataType::Decimal128(100, 50),
+ 0,
+ 256,
+ );
assert!(result.is_err());
}
diff --git a/crates/fluss/src/row/column_writer.rs
b/crates/fluss/src/row/column_writer.rs
new file mode 100644
index 0000000..34dd0f5
--- /dev/null
+++ b/crates/fluss/src/row/column_writer.rs
@@ -0,0 +1,771 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Typed column writers that write directly from [`InternalRow`] to concrete
+//! Arrow builders, bypassing the intermediate [`Datum`] enum and runtime
+//! `downcast_mut` dispatch.
+
+use crate::error::Error::RowConvertError;
+use crate::error::{Error, Result};
+use crate::metadata::DataType;
+use crate::row::InternalRow;
+use crate::row::datum::{
+ MICROS_PER_MILLI, MILLIS_PER_SECOND, NANOS_PER_MILLI,
append_decimal_to_builder,
+ millis_nanos_to_micros, millis_nanos_to_nanos,
+};
+use arrow::array::{
+ ArrayBuilder, ArrayRef, BinaryBuilder, BooleanBuilder, Date32Builder,
Decimal128Builder,
+ FixedSizeBinaryBuilder, Float32Builder, Float64Builder, Int8Builder,
Int16Builder,
+ Int32Builder, Int64Builder, StringBuilder, Time32MillisecondBuilder,
Time32SecondBuilder,
+ Time64MicrosecondBuilder, Time64NanosecondBuilder,
TimestampMicrosecondBuilder,
+ TimestampMillisecondBuilder, TimestampNanosecondBuilder,
TimestampSecondBuilder,
+};
+use arrow_schema::DataType as ArrowDataType;
+
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
+/// A typed column writer that reads one column from an [`InternalRow`] and
+/// appends directly to a concrete Arrow builder — no intermediate [`Datum`],
+/// no `as_any_mut().downcast_mut()`.
+pub struct ColumnWriter {
+ pos: usize,
+ nullable: bool,
+ inner: TypedWriter,
+}
+
+enum TypedWriter {
+ Bool(BooleanBuilder),
+ Int8(Int8Builder),
+ Int16(Int16Builder),
+ Int32(Int32Builder),
+ Int64(Int64Builder),
+ Float32(Float32Builder),
+ Float64(Float64Builder),
+ Char {
+ len: usize,
+ builder: StringBuilder,
+ },
+ String(StringBuilder),
+ Bytes(BinaryBuilder),
+ Binary {
+ len: usize,
+ builder: FixedSizeBinaryBuilder,
+ },
+ Decimal128 {
+ src_precision: usize,
+ src_scale: usize,
+ target_precision: u32,
+ target_scale: i64,
+ builder: Decimal128Builder,
+ },
+ Date32(Date32Builder),
+ Time32Second(Time32SecondBuilder),
+ Time32Millisecond(Time32MillisecondBuilder),
+ Time64Microsecond(Time64MicrosecondBuilder),
+ Time64Nanosecond(Time64NanosecondBuilder),
+ TimestampNtzSecond {
+ precision: u32,
+ builder: TimestampSecondBuilder,
+ },
+ TimestampNtzMillisecond {
+ precision: u32,
+ builder: TimestampMillisecondBuilder,
+ },
+ TimestampNtzMicrosecond {
+ precision: u32,
+ builder: TimestampMicrosecondBuilder,
+ },
+ TimestampNtzNanosecond {
+ precision: u32,
+ builder: TimestampNanosecondBuilder,
+ },
+ TimestampLtzSecond {
+ precision: u32,
+ builder: TimestampSecondBuilder,
+ },
+ TimestampLtzMillisecond {
+ precision: u32,
+ builder: TimestampMillisecondBuilder,
+ },
+ TimestampLtzMicrosecond {
+ precision: u32,
+ builder: TimestampMicrosecondBuilder,
+ },
+ TimestampLtzNanosecond {
+ precision: u32,
+ builder: TimestampNanosecondBuilder,
+ },
+}
+
+/// Dispatch to the inner builder across all `TypedWriter` variants.
+/// Exhaustive matching ensures new variants won't compile without an arm.
+macro_rules! with_builder {
+ ($self:expr, $b:ident => $body:expr) => {
+ match $self {
+ TypedWriter::Bool($b) => $body,
+ TypedWriter::Int8($b) => $body,
+ TypedWriter::Int16($b) => $body,
+ TypedWriter::Int32($b) => $body,
+ TypedWriter::Int64($b) => $body,
+ TypedWriter::Float32($b) => $body,
+ TypedWriter::Float64($b) => $body,
+ TypedWriter::Char { builder: $b, .. } => $body,
+ TypedWriter::String($b) => $body,
+ TypedWriter::Bytes($b) => $body,
+ TypedWriter::Binary { builder: $b, .. } => $body,
+ TypedWriter::Decimal128 { builder: $b, .. } => $body,
+ TypedWriter::Date32($b) => $body,
+ TypedWriter::Time32Second($b) => $body,
+ TypedWriter::Time32Millisecond($b) => $body,
+ TypedWriter::Time64Microsecond($b) => $body,
+ TypedWriter::Time64Nanosecond($b) => $body,
+ TypedWriter::TimestampNtzSecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampNtzMillisecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampNtzMicrosecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampNtzNanosecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampLtzSecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampLtzMillisecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampLtzMicrosecond { builder: $b, .. } => $body,
+ TypedWriter::TimestampLtzNanosecond { builder: $b, .. } => $body,
+ }
+ };
+}
+
+impl ColumnWriter {
+ /// Create a column writer for the given Fluss `DataType` and Arrow
+ /// `ArrowDataType` at position `pos` with the given pre-allocation
+ /// `capacity`.
+ pub fn create(
+ fluss_type: &DataType,
+ arrow_type: &ArrowDataType,
+ pos: usize,
+ capacity: usize,
+ ) -> Result<Self> {
+ let nullable = fluss_type.is_nullable();
+
+ let inner = match fluss_type {
+ DataType::Boolean(_) =>
TypedWriter::Bool(BooleanBuilder::with_capacity(capacity)),
+ DataType::TinyInt(_) =>
TypedWriter::Int8(Int8Builder::with_capacity(capacity)),
+ DataType::SmallInt(_) =>
TypedWriter::Int16(Int16Builder::with_capacity(capacity)),
+ DataType::Int(_) =>
TypedWriter::Int32(Int32Builder::with_capacity(capacity)),
+ DataType::BigInt(_) =>
TypedWriter::Int64(Int64Builder::with_capacity(capacity)),
+ DataType::Float(_) =>
TypedWriter::Float32(Float32Builder::with_capacity(capacity)),
+ DataType::Double(_) =>
TypedWriter::Float64(Float64Builder::with_capacity(capacity)),
+ DataType::Char(t) => TypedWriter::Char {
+ len: t.length() as usize,
+ builder: StringBuilder::with_capacity(
+ capacity,
+ capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+ ),
+ },
+ DataType::String(_) =>
TypedWriter::String(StringBuilder::with_capacity(
+ capacity,
+ capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+ )),
+ DataType::Bytes(_) =>
TypedWriter::Bytes(BinaryBuilder::with_capacity(
+ capacity,
+ capacity.saturating_mul(VARIABLE_WIDTH_AVG_BYTES),
+ )),
+ DataType::Binary(t) => {
+ let arrow_len: i32 = t.length().try_into().map_err(|_|
Error::IllegalArgument {
+ message: format!(
+ "Binary length {} exceeds Arrow's maximum (i32::MAX)",
+ t.length()
+ ),
+ })?;
+ TypedWriter::Binary {
+ len: t.length(),
+ builder: FixedSizeBinaryBuilder::with_capacity(capacity,
arrow_len),
+ }
+ }
+ DataType::Decimal(dt) => {
+ let (target_p, target_s) = match arrow_type {
+ ArrowDataType::Decimal128(p, s) => (*p, *s),
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Expected Decimal128 Arrow type for Decimal,
got: {arrow_type:?}"
+ ),
+ });
+ }
+ };
+ if target_s < 0 {
+ return Err(Error::IllegalArgument {
+ message: format!("Negative decimal scale {target_s} is
not supported"),
+ });
+ }
+ let builder = Decimal128Builder::with_capacity(capacity)
+ .with_precision_and_scale(target_p, target_s)
+ .map_err(|e| Error::IllegalArgument {
+ message: format!(
+ "Invalid decimal precision {target_p} or scale
{target_s}: {e}"
+ ),
+ })?;
+ TypedWriter::Decimal128 {
+ src_precision: dt.precision() as usize,
+ src_scale: dt.scale() as usize,
+ target_precision: target_p as u32,
+ target_scale: target_s as i64,
+ builder,
+ }
+ }
+ DataType::Date(_) =>
TypedWriter::Date32(Date32Builder::with_capacity(capacity)),
+ DataType::Time(_) => match arrow_type {
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Second) => {
+
TypedWriter::Time32Second(Time32SecondBuilder::with_capacity(capacity))
+ }
+ ArrowDataType::Time32(arrow_schema::TimeUnit::Millisecond) => {
+
TypedWriter::Time32Millisecond(Time32MillisecondBuilder::with_capacity(
+ capacity,
+ ))
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Microsecond) => {
+
TypedWriter::Time64Microsecond(Time64MicrosecondBuilder::with_capacity(
+ capacity,
+ ))
+ }
+ ArrowDataType::Time64(arrow_schema::TimeUnit::Nanosecond) => {
+
TypedWriter::Time64Nanosecond(Time64NanosecondBuilder::with_capacity(capacity))
+ }
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!("Unsupported Arrow type for Time:
{arrow_type:?}"),
+ });
+ }
+ },
+ DataType::Timestamp(t) => {
+ let precision = t.precision();
+ match arrow_type {
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second,
_) => {
+ TypedWriter::TimestampNtzSecond {
+ precision,
+ builder:
TimestampSecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+ TypedWriter::TimestampNtzMillisecond {
+ precision,
+ builder:
TimestampMillisecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+ TypedWriter::TimestampNtzMicrosecond {
+ precision,
+ builder:
TimestampMicrosecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+ TypedWriter::TimestampNtzNanosecond {
+ precision,
+ builder:
TimestampNanosecondBuilder::with_capacity(capacity),
+ }
+ }
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Unsupported Arrow type for Timestamp:
{arrow_type:?}"
+ ),
+ });
+ }
+ }
+ }
+ DataType::TimestampLTz(t) => {
+ let precision = t.precision();
+ match arrow_type {
+ ArrowDataType::Timestamp(arrow_schema::TimeUnit::Second,
_) => {
+ TypedWriter::TimestampLtzSecond {
+ precision,
+ builder:
TimestampSecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
+ TypedWriter::TimestampLtzMillisecond {
+ precision,
+ builder:
TimestampMillisecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
+ TypedWriter::TimestampLtzMicrosecond {
+ precision,
+ builder:
TimestampMicrosecondBuilder::with_capacity(capacity),
+ }
+ }
+
ArrowDataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
+ TypedWriter::TimestampLtzNanosecond {
+ precision,
+ builder:
TimestampNanosecondBuilder::with_capacity(capacity),
+ }
+ }
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!(
+ "Unsupported Arrow type for TimestampLTz:
{arrow_type:?}"
+ ),
+ });
+ }
+ }
+ }
+ _ => {
+ return Err(Error::IllegalArgument {
+ message: format!("Unsupported Fluss DataType:
{fluss_type:?}"),
+ });
+ }
+ };
+
+ Ok(Self {
+ pos,
+ nullable,
+ inner,
+ })
+ }
+
+ /// Read one value from `row` at this writer's column position and append
it
+ /// directly to the concrete Arrow builder.
+ #[inline]
+ pub fn write_field(&mut self, row: &dyn InternalRow) -> Result<()> {
+ if self.nullable && row.is_null_at(self.pos)? {
+ self.append_null();
+ return Ok(());
+ }
+ self.write_non_null(row)
+ }
+
+ /// Finish the builder, producing the final Arrow array.
+ pub fn finish(&mut self) -> ArrayRef {
+ self.as_builder_mut().finish()
+ }
+
+ /// Clone-finish the builder for size estimation (does not reset the
builder).
+ pub fn finish_cloned(&self) -> ArrayRef {
+ self.as_builder_ref().finish_cloned()
+ }
+
+ fn append_null(&mut self) {
+ with_builder!(&mut self.inner, b => b.append_null());
+ }
+
+ /// Returns a trait-object reference to the inner builder.
+ /// Used for type-agnostic operations (`finish`, `finish_cloned`).
+ fn as_builder_mut(&mut self) -> &mut dyn ArrayBuilder {
+ with_builder!(&mut self.inner, b => b)
+ }
+
+ fn as_builder_ref(&self) -> &dyn ArrayBuilder {
+ with_builder!(&self.inner, b => b)
+ }
+
+ #[inline]
+ fn write_non_null(&mut self, row: &dyn InternalRow) -> Result<()> {
+ let pos = self.pos;
+
+ match &mut self.inner {
+ TypedWriter::Bool(b) => {
+ b.append_value(row.get_boolean(pos)?);
+ Ok(())
+ }
+ TypedWriter::Int8(b) => {
+ b.append_value(row.get_byte(pos)?);
+ Ok(())
+ }
+ TypedWriter::Int16(b) => {
+ b.append_value(row.get_short(pos)?);
+ Ok(())
+ }
+ TypedWriter::Int32(b) => {
+ b.append_value(row.get_int(pos)?);
+ Ok(())
+ }
+ TypedWriter::Int64(b) => {
+ b.append_value(row.get_long(pos)?);
+ Ok(())
+ }
+ TypedWriter::Float32(b) => {
+ b.append_value(row.get_float(pos)?);
+ Ok(())
+ }
+ TypedWriter::Float64(b) => {
+ b.append_value(row.get_double(pos)?);
+ Ok(())
+ }
+ TypedWriter::Char { len, builder } => {
+ let v = row.get_char(pos, *len)?;
+ builder.append_value(v);
+ Ok(())
+ }
+ TypedWriter::String(b) => {
+ let v = row.get_string(pos)?;
+ b.append_value(v);
+ Ok(())
+ }
+ TypedWriter::Bytes(b) => {
+ let v = row.get_bytes(pos)?;
+ b.append_value(v);
+ Ok(())
+ }
+ TypedWriter::Binary { len, builder } => {
+ let v = row.get_binary(pos, *len)?;
+ builder.append_value(v).map_err(|e| RowConvertError {
+ message: format!("Failed to append binary value: {e}"),
+ })?;
+ Ok(())
+ }
+ TypedWriter::Decimal128 {
+ src_precision,
+ src_scale,
+ target_precision,
+ target_scale,
+ builder,
+ } => {
+ let decimal = row.get_decimal(pos, *src_precision,
*src_scale)?;
+ append_decimal_to_builder(&decimal, *target_precision,
*target_scale, builder)
+ }
+ TypedWriter::Date32(b) => {
+ let date = row.get_date(pos)?;
+ b.append_value(date.get_inner());
+ Ok(())
+ }
+ TypedWriter::Time32Second(b) => {
+ let millis = row.get_time(pos)?.get_inner();
+ if millis % MILLIS_PER_SECOND as i32 != 0 {
+ return Err(RowConvertError {
+ message: format!(
+ "Time value {millis} ms has sub-second precision
but schema expects seconds only"
+ ),
+ });
+ }
+ b.append_value(millis / MILLIS_PER_SECOND as i32);
+ Ok(())
+ }
+ TypedWriter::Time32Millisecond(b) => {
+ b.append_value(row.get_time(pos)?.get_inner());
+ Ok(())
+ }
+ TypedWriter::Time64Microsecond(b) => {
+ let millis = row.get_time(pos)?.get_inner();
+ let micros = (millis as i64)
+ .checked_mul(MICROS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Time value {millis} ms overflows when converting
to microseconds"
+ ),
+ })?;
+ b.append_value(micros);
+ Ok(())
+ }
+ TypedWriter::Time64Nanosecond(b) => {
+ let millis = row.get_time(pos)?.get_inner();
+ let nanos = (millis as i64)
+ .checked_mul(NANOS_PER_MILLI)
+ .ok_or_else(|| RowConvertError {
+ message: format!(
+ "Time value {millis} ms overflows when converting
to nanoseconds"
+ ),
+ })?;
+ b.append_value(nanos);
+ Ok(())
+ }
+ // --- TimestampNtz variants ---
+ TypedWriter::TimestampNtzSecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ntz(pos, *precision)?;
+ builder.append_value(ts.get_millisecond() / MILLIS_PER_SECOND);
+ Ok(())
+ }
+ TypedWriter::TimestampNtzMillisecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ntz(pos, *precision)?;
+ builder.append_value(ts.get_millisecond());
+ Ok(())
+ }
+ TypedWriter::TimestampNtzMicrosecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ntz(pos, *precision)?;
+ builder.append_value(millis_nanos_to_micros(
+ ts.get_millisecond(),
+ ts.get_nano_of_millisecond(),
+ )?);
+ Ok(())
+ }
+ TypedWriter::TimestampNtzNanosecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ntz(pos, *precision)?;
+ builder.append_value(millis_nanos_to_nanos(
+ ts.get_millisecond(),
+ ts.get_nano_of_millisecond(),
+ )?);
+ Ok(())
+ }
+ // --- TimestampLtz variants ---
+ TypedWriter::TimestampLtzSecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ltz(pos, *precision)?;
+ builder.append_value(ts.get_epoch_millisecond() /
MILLIS_PER_SECOND);
+ Ok(())
+ }
+ TypedWriter::TimestampLtzMillisecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ltz(pos, *precision)?;
+ builder.append_value(ts.get_epoch_millisecond());
+ Ok(())
+ }
+ TypedWriter::TimestampLtzMicrosecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ltz(pos, *precision)?;
+ builder.append_value(millis_nanos_to_micros(
+ ts.get_epoch_millisecond(),
+ ts.get_nano_of_millisecond(),
+ )?);
+ Ok(())
+ }
+ TypedWriter::TimestampLtzNanosecond {
+ precision, builder, ..
+ } => {
+ let ts = row.get_timestamp_ltz(pos, *precision)?;
+ builder.append_value(millis_nanos_to_nanos(
+ ts.get_epoch_millisecond(),
+ ts.get_nano_of_millisecond(),
+ )?);
+ Ok(())
+ }
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::metadata::DataTypes;
+ use crate::record::to_arrow_type;
+ use crate::row::{Date, Datum, GenericRow, Time, TimestampLtz,
TimestampNtz};
+ use arrow::array::*;
+ use bigdecimal::BigDecimal;
+ use std::str::FromStr;
+
+ /// Helper: create a ColumnWriter from a Fluss DataType, deriving the
Arrow type automatically.
+ fn writer_for(fluss_type: &DataType, capacity: usize) -> ColumnWriter {
+ let arrow_type = to_arrow_type(fluss_type).unwrap();
+ ColumnWriter::create(fluss_type, &arrow_type, 0, capacity).unwrap()
+ }
+
+ /// Helper: write a single datum and return the finished array.
+ fn write_one(fluss_type: &DataType, datum: Datum) -> ArrayRef {
+ let mut w = writer_for(fluss_type, 4);
+ w.write_field(&GenericRow::from_data(vec![datum])).unwrap();
+ w.finish()
+ }
+
+ #[test]
+ fn write_all_scalar_types() {
+ // Boolean
+ let arr = write_one(&DataTypes::boolean(), Datum::Bool(true));
+ assert!(
+ arr.as_any()
+ .downcast_ref::<BooleanArray>()
+ .unwrap()
+ .value(0)
+ );
+
+ // Integer types
+ let arr = write_one(&DataTypes::tinyint(), Datum::Int8(42));
+ assert_eq!(
+ arr.as_any().downcast_ref::<Int8Array>().unwrap().value(0),
+ 42
+ );
+
+ let arr = write_one(&DataTypes::smallint(), Datum::Int16(1000));
+ assert_eq!(
+ arr.as_any().downcast_ref::<Int16Array>().unwrap().value(0),
+ 1000
+ );
+
+ let arr = write_one(&DataTypes::int(), Datum::Int32(100_000));
+ assert_eq!(
+ arr.as_any().downcast_ref::<Int32Array>().unwrap().value(0),
+ 100_000
+ );
+
+ let arr = write_one(&DataTypes::bigint(), Datum::Int64(9_000_000_000));
+ assert_eq!(
+ arr.as_any().downcast_ref::<Int64Array>().unwrap().value(0),
+ 9_000_000_000
+ );
+
+ // Float types
+ let arr = write_one(&DataTypes::float(), Datum::Float32(1.5.into()));
+ assert!(
+ (arr.as_any()
+ .downcast_ref::<Float32Array>()
+ .unwrap()
+ .value(0)
+ - 1.5)
+ .abs()
+ < 0.001
+ );
+
+ let arr = write_one(&DataTypes::double(),
Datum::Float64(1.125.into()));
+ assert!(
+ (arr.as_any()
+ .downcast_ref::<Float64Array>()
+ .unwrap()
+ .value(0)
+ - 1.125)
+ .abs()
+ < 0.001
+ );
+
+ // String / Char
+ let arr = write_one(&DataTypes::string(),
Datum::String("hello".into()));
+ assert_eq!(
+ arr.as_any().downcast_ref::<StringArray>().unwrap().value(0),
+ "hello"
+ );
+
+ let arr = write_one(&DataTypes::char(10),
Datum::String("world".into()));
+ assert_eq!(
+ arr.as_any().downcast_ref::<StringArray>().unwrap().value(0),
+ "world"
+ );
+
+ // Bytes / Binary
+ let arr = write_one(&DataTypes::bytes(), Datum::Blob(vec![1, 2,
3].into()));
+ assert_eq!(
+ arr.as_any().downcast_ref::<BinaryArray>().unwrap().value(0),
+ &[1, 2, 3]
+ );
+
+ let arr = write_one(
+ &DataTypes::binary(4),
+ Datum::Blob(vec![10, 20, 30, 40].into()),
+ );
+ assert_eq!(
+ arr.as_any()
+ .downcast_ref::<FixedSizeBinaryArray>()
+ .unwrap()
+ .value(0),
+ &[10, 20, 30, 40]
+ );
+
+ // Date
+ let arr = write_one(&DataTypes::date(), Datum::Date(Date::new(19000)));
+ assert_eq!(
+ arr.as_any().downcast_ref::<Date32Array>().unwrap().value(0),
+ 19000
+ );
+
+ // Time (precision 3 → Millisecond)
+ let arr = write_one(
+ &DataTypes::time_with_precision(3),
+ Datum::Time(Time::new(45_000)),
+ );
+ assert_eq!(
+ arr.as_any()
+ .downcast_ref::<Time32MillisecondArray>()
+ .unwrap()
+ .value(0),
+ 45_000
+ );
+
+ // Decimal
+ let decimal =
+
crate::row::Decimal::from_big_decimal(BigDecimal::from_str("123.45").unwrap(),
10, 2)
+ .unwrap();
+ let arr = write_one(&DataTypes::decimal(10, 2),
Datum::Decimal(decimal));
+ assert_eq!(
+ arr.as_any()
+ .downcast_ref::<Decimal128Array>()
+ .unwrap()
+ .value(0),
+ 12345
+ );
+
+ // Timestamp NTZ (precision 3 → Millisecond)
+ let arr = write_one(
+ &DataTypes::timestamp_with_precision(3),
+ Datum::TimestampNtz(TimestampNtz::new(1_700_000_000_000)),
+ );
+ assert_eq!(
+ arr.as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap()
+ .value(0),
+ 1_700_000_000_000
+ );
+
+ // Timestamp LTZ (precision 3 → Millisecond)
+ let arr = write_one(
+ &DataTypes::timestamp_ltz_with_precision(3),
+ Datum::TimestampLtz(TimestampLtz::new(1_700_000_000_000)),
+ );
+ assert_eq!(
+ arr.as_any()
+ .downcast_ref::<TimestampMillisecondArray>()
+ .unwrap()
+ .value(0),
+ 1_700_000_000_000
+ );
+ }
+
+ #[test]
+ fn write_null_and_multiple_rows() {
+ // Null
+ let arr = write_one(&DataTypes::int(), Datum::Null);
+ assert!(arr.is_null(0));
+
+ // Multiple rows
+ let mut w = writer_for(&DataTypes::int(), 8);
+ for val in [10, 20, 30] {
+ w.write_field(&GenericRow::from_data(vec![val])).unwrap();
+ }
+ let arr = w.finish();
+ let int_arr = arr.as_any().downcast_ref::<Int32Array>().unwrap();
+ assert_eq!(int_arr.len(), 3);
+ assert_eq!(int_arr.value(0), 10);
+ assert_eq!(int_arr.value(1), 20);
+ assert_eq!(int_arr.value(2), 30);
+
+ // finish_cloned does not reset
+ let mut w = writer_for(&DataTypes::int(), 4);
+ w.write_field(&GenericRow::from_data(vec![42_i32])).unwrap();
+ assert_eq!(w.finish_cloned().len(), 1);
+ w.write_field(&GenericRow::from_data(vec![99_i32])).unwrap();
+ let int_arr = w
+ .finish()
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap()
+ .clone();
+ assert_eq!((int_arr.value(0), int_arr.value(1)), (42, 99));
+ }
+
+ #[test]
+ fn unsupported_type_returns_error() {
+ let fluss_type = DataTypes::array(DataTypes::int());
+ let arrow_type = ArrowDataType::List(arrow_schema::FieldRef::new(
+ arrow_schema::Field::new("item", ArrowDataType::Int32, true),
+ ));
+ assert!(ColumnWriter::create(&fluss_type, &arrow_type, 0, 4).is_err());
+ }
+}
diff --git a/crates/fluss/src/row/datum.rs b/crates/fluss/src/row/datum.rs
index b370fb1..9b2e80a 100644
--- a/crates/fluss/src/row/datum.rs
+++ b/crates/fluss/src/row/datum.rs
@@ -397,13 +397,13 @@ pub trait ToArrow {
}
// Time unit conversion constants
-const MILLIS_PER_SECOND: i64 = 1_000;
-const MICROS_PER_MILLI: i64 = 1_000;
-const NANOS_PER_MILLI: i64 = 1_000_000;
+pub(crate) const MILLIS_PER_SECOND: i64 = 1_000;
+pub(crate) const MICROS_PER_MILLI: i64 = 1_000;
+pub(crate) const NANOS_PER_MILLI: i64 = 1_000_000;
/// Converts milliseconds and nanoseconds-within-millisecond to total
microseconds.
/// Returns an error if the conversion would overflow.
-fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
+pub(crate) fn millis_nanos_to_micros(millis: i64, nanos: i32) -> Result<i64> {
let millis_micros = millis
.checked_mul(MICROS_PER_MILLI)
.ok_or_else(|| RowConvertError {
@@ -423,7 +423,7 @@ fn millis_nanos_to_micros(millis: i64, nanos: i32) ->
Result<i64> {
/// Converts milliseconds and nanoseconds-within-millisecond to total
nanoseconds.
/// Returns an error if the conversion would overflow.
-fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
+pub(crate) fn millis_nanos_to_nanos(millis: i64, nanos: i32) -> Result<i64> {
let millis_nanos = millis
.checked_mul(NANOS_PER_MILLI)
.ok_or_else(|| RowConvertError {
@@ -440,6 +440,42 @@ fn millis_nanos_to_nanos(millis: i64, nanos: i32) ->
Result<i64> {
})
}
+/// Rescales a [`Decimal`] to the given Arrow target precision/scale and
appends
+/// the resulting i128 to the builder.
+pub(crate) fn append_decimal_to_builder(
+ decimal: &Decimal,
+ target_precision: u32,
+ target_scale: i64,
+ builder: &mut Decimal128Builder,
+) -> Result<()> {
+ use bigdecimal::RoundingMode;
+
+ let bd = decimal.to_big_decimal();
+ let rescaled = bd.with_scale_round(target_scale, RoundingMode::HalfUp);
+ let (unscaled, _) = rescaled.as_bigint_and_exponent();
+
+ let actual_precision = Decimal::compute_precision(&unscaled);
+ if actual_precision > target_precision as usize {
+ return Err(RowConvertError {
+ message: format!(
+ "Decimal precision overflow: value has {actual_precision}
digits but Arrow expects {target_precision} (value: {rescaled})"
+ ),
+ });
+ }
+
+ let i128_val: i128 = match unscaled.try_into() {
+ Ok(v) => v,
+ Err(_) => {
+ return Err(RowConvertError {
+ message: format!("Decimal value exceeds i128 range:
{rescaled}"),
+ });
+ }
+ };
+
+ builder.append_value(i128_val);
+ Ok(())
+}
+
trait AppendResult {
fn into_append_result(self) -> Result<()>;
}
@@ -539,45 +575,14 @@ impl Datum<'_> {
}
};
- // Validate scale is non-negative (Fluss doesn't support
negative scales)
if s < 0 {
return Err(RowConvertError {
message: format!("Negative decimal scale {s} is not
supported"),
});
}
- let target_precision = p as u32;
- let target_scale = s as i64; // Safe now: 0..127 → 0i64..127i64
-
if let Some(b) =
builder.as_any_mut().downcast_mut::<Decimal128Builder>() {
- use bigdecimal::RoundingMode;
-
- // Rescale the decimal to match Arrow's target scale
- let bd = decimal.to_big_decimal();
- let rescaled = bd.with_scale_round(target_scale,
RoundingMode::HalfUp);
- let (unscaled, _) = rescaled.as_bigint_and_exponent();
-
- // Validate precision
- let actual_precision =
Decimal::compute_precision(&unscaled);
- if actual_precision > target_precision as usize {
- return Err(RowConvertError {
- message: format!(
- "Decimal precision overflow: value has
{actual_precision} digits but Arrow expects {target_precision} (value:
{rescaled})"
- ),
- });
- }
-
- // Convert to i128 for Arrow
- let i128_val: i128 = match unscaled.try_into() {
- Ok(v) => v,
- Err(_) => {
- return Err(RowConvertError {
- message: format!("Decimal value exceeds i128
range: {rescaled}"),
- });
- }
- };
-
- b.append_value(i128_val);
+ append_decimal_to_builder(decimal, p as u32, s as i64, b)?;
return Ok(());
}
diff --git a/crates/fluss/src/row/mod.rs b/crates/fluss/src/row/mod.rs
index 8fb777d..ef99ba2 100644
--- a/crates/fluss/src/row/mod.rs
+++ b/crates/fluss/src/row/mod.rs
@@ -17,10 +17,11 @@
mod column;
-mod datum;
+pub(crate) mod datum;
mod decimal;
pub mod binary;
+pub(crate) mod column_writer;
pub mod compacted;
pub mod encode;
pub mod field_getter;