This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 251c1a9  chore: pre-size Arrow builders and recommend jemalloc for 
write path (#430)
251c1a9 is described below

commit 251c1a91af315381239f4c87493baba224a82238
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 09:51:04 2026 +0000

    chore: pre-size Arrow builders and recommend jemalloc for write path (#430)
---
 crates/examples/Cargo.toml           |  4 ++
 crates/examples/src/example_table.rs |  4 ++
 crates/fluss/Cargo.toml              |  1 -
 crates/fluss/src/lib.rs              | 19 +++++++
 crates/fluss/src/record/arrow.rs     | 96 +++++++++++++++++++++++-------------
 5 files changed, 88 insertions(+), 36 deletions(-)

diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 26251cc..b918739 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -27,6 +27,10 @@ version = { workspace = true }
 fluss = { workspace = true, features = ["storage-all"] }
 tokio = { workspace = true }
 clap = { workspace = true }
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.6"
+
 [[example]]
 name = "example-table"
 path = "src/example_table.rs"
diff --git a/crates/examples/src/example_table.rs 
b/crates/examples/src/example_table.rs
index e4ad1fb..49f0ab4 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -15,6 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+
 mod example_kv_table;
 mod example_partitioned_kv_table;
 
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index db1348a..c0ba6f8 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -80,6 +80,5 @@ jiff = { workspace = true, features = ["js"] }
 testcontainers = "0.25.0"
 test-env-helpers = "0.2.2"
 
-
 [build-dependencies]
 prost-build = { version = "0.14" }
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index cd060c8..13e8598 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -101,6 +101,25 @@
 //!     Ok(())
 //! }
 //! ```
+//!
+//! # Performance
+//!
+//! For production deployments on Linux, we recommend using
+//! [jemalloc](https://crates.io/crates/tikv-jemallocator) as the global 
allocator.
+//! The default glibc allocator (ptmalloc2) can cause RSS bloat and 
fragmentation under
+//! sustained write loads due to repeated same-size alloc/free cycles in Arrow 
batch building.
+//! jemalloc's thread-local size-class bins handle this pattern efficiently.
+//!
+//! ```toml
+//! [target.'cfg(not(target_env = "msvc"))'.dependencies]
+//! tikv-jemallocator = "0.6"
+//! ```
+//!
+//! ```rust,ignore
+//! #[cfg(not(target_env = "msvc"))]
+//! #[global_allocator]
+//! static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+//! ```
 
 pub mod client;
 pub mod metadata;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index ea27836..7fd1619 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -151,8 +151,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1;
 
 pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
 
+// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead 
of a hard record cap.
 pub const DEFAULT_MAX_RECORD: i32 = 256;
 
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch 
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
 pub struct MemoryLogRecordsArrowBuilder {
     base_log_offset: i64,
     schema_id: i32,
@@ -236,11 +241,12 @@ pub struct RowAppendRecordBatchBuilder {
 
 impl RowAppendRecordBatchBuilder {
     pub fn new(row_type: &RowType) -> Result<Self> {
+        let capacity = DEFAULT_MAX_RECORD as usize;
         let schema_ref = to_arrow_schema(row_type)?;
         let builders: Result<Vec<_>> = schema_ref
             .fields()
             .iter()
-            .map(|field| Self::create_builder(field.data_type()))
+            .map(|field| Self::create_builder(field.data_type(), capacity))
             .collect();
         let field_getters = FieldGetter::create_field_getters(row_type);
         Ok(Self {
@@ -251,26 +257,41 @@ impl RowAppendRecordBatchBuilder {
         })
     }
 
-    fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn 
ArrayBuilder>> {
+    fn create_builder(
+        data_type: &arrow_schema::DataType,
+        capacity: usize,
+    ) -> Result<Box<dyn ArrayBuilder>> {
         match data_type {
-            arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
-            arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
-            arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
-            arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
-            arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
-            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::new())),
-            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::new())),
-            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::new())),
-            arrow_schema::DataType::Float32 => 
Ok(Box::new(Float32Builder::new())),
-            arrow_schema::DataType::Float64 => 
Ok(Box::new(Float64Builder::new())),
-            arrow_schema::DataType::Boolean => 
Ok(Box::new(BooleanBuilder::new())),
-            arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
-            arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::new())),
-            arrow_schema::DataType::FixedSizeBinary(size) => {
-                Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
+            arrow_schema::DataType::Int8 => 
Ok(Box::new(Int8Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int16 => 
Ok(Box::new(Int16Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int32 => 
Ok(Box::new(Int32Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Int64 => 
Ok(Box::new(Int64Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt8 => 
Ok(Box::new(UInt8Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt16 => 
Ok(Box::new(UInt16Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt32 => 
Ok(Box::new(UInt32Builder::with_capacity(capacity))),
+            arrow_schema::DataType::UInt64 => 
Ok(Box::new(UInt64Builder::with_capacity(capacity))),
+            arrow_schema::DataType::Float32 => {
+                Ok(Box::new(Float32Builder::with_capacity(capacity)))
+            }
+            arrow_schema::DataType::Float64 => {
+                Ok(Box::new(Float64Builder::with_capacity(capacity)))
+            }
+            arrow_schema::DataType::Boolean => {
+                Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
             }
+            arrow_schema::DataType::Utf8 => 
Ok(Box::new(StringBuilder::with_capacity(
+                capacity,
+                capacity * VARIABLE_WIDTH_AVG_BYTES,
+            ))),
+            arrow_schema::DataType::Binary => 
Ok(Box::new(BinaryBuilder::with_capacity(
+                capacity,
+                capacity * VARIABLE_WIDTH_AVG_BYTES,
+            ))),
+            arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new(
+                FixedSizeBinaryBuilder::with_capacity(capacity, *size),
+            )),
             arrow_schema::DataType::Decimal128(precision, scale) => {
-                let builder = Decimal128Builder::new()
+                let builder = Decimal128Builder::with_capacity(capacity)
                     .with_precision_and_scale(*precision, *scale)
                     .map_err(|e| Error::IllegalArgument {
                         message: format!(
@@ -279,11 +300,13 @@ impl RowAppendRecordBatchBuilder {
                     })?;
                 Ok(Box::new(builder))
             }
-            arrow_schema::DataType::Date32 => 
Ok(Box::new(Date32Builder::new())),
+            arrow_schema::DataType::Date32 => 
Ok(Box::new(Date32Builder::with_capacity(capacity))),
             arrow_schema::DataType::Time32(unit) => match unit {
-                arrow_schema::TimeUnit::Second => 
Ok(Box::new(Time32SecondBuilder::new())),
+                arrow_schema::TimeUnit::Second => {
+                    Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
+                }
                 arrow_schema::TimeUnit::Millisecond => {
-                    Ok(Box::new(Time32MillisecondBuilder::new()))
+                    
Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
                 }
                 _ => Err(Error::IllegalArgument {
                     message: format!(
@@ -293,9 +316,11 @@ impl RowAppendRecordBatchBuilder {
             },
             arrow_schema::DataType::Time64(unit) => match unit {
                 arrow_schema::TimeUnit::Microsecond => {
-                    Ok(Box::new(Time64MicrosecondBuilder::new()))
+                    
Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
+                }
+                arrow_schema::TimeUnit::Nanosecond => {
+                    
Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
                 }
-                arrow_schema::TimeUnit::Nanosecond => 
Ok(Box::new(Time64NanosecondBuilder::new())),
                 _ => Err(Error::IllegalArgument {
                     message: format!(
                         "Time64 only supports Microsecond and Nanosecond 
units, got: {unit:?}"
@@ -303,17 +328,17 @@ impl RowAppendRecordBatchBuilder {
                 }),
             },
             arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second, 
_) => {
-                Ok(Box::new(TimestampSecondBuilder::new()))
-            }
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
-                Ok(Box::new(TimestampMillisecondBuilder::new()))
-            }
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
-                Ok(Box::new(TimestampMicrosecondBuilder::new()))
-            }
-            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
-                Ok(Box::new(TimestampNanosecondBuilder::new()))
+                Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
             }
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
+                Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
+            ),
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok(
+                Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)),
+            ),
+            
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok(
+                Box::new(TimestampNanosecondBuilder::with_capacity(capacity)),
+            ),
             dt => Err(Error::IllegalArgument {
                 message: format!("Unsupported data type: {dt:?}"),
             }),
@@ -1701,7 +1726,8 @@ mod tests {
 
         // Test valid builder creation with precision=10, scale=2
         let mut builder =
-            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 
2)).unwrap();
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2), 
256)
+                .unwrap();
         let decimal_builder = builder
             .as_any_mut()
             .downcast_mut::<Decimal128Builder>()
@@ -1712,7 +1738,7 @@ mod tests {
 
         // Test error case: invalid precision/scale
         let result =
-            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50));
+            
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100, 
50), 256);
         assert!(result.is_err());
     }
 

Reply via email to