This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 251c1a9 chore: pre-size Arrow builders and recommend jemalloc for
write path (#430)
251c1a9 is described below
commit 251c1a91af315381239f4c87493baba224a82238
Author: Anton Borisov <[email protected]>
AuthorDate: Sun Mar 8 09:51:04 2026 +0000
chore: pre-size Arrow builders and recommend jemalloc for write path (#430)
---
crates/examples/Cargo.toml | 4 ++
crates/examples/src/example_table.rs | 4 ++
crates/fluss/Cargo.toml | 1 -
crates/fluss/src/lib.rs | 19 +++++++
crates/fluss/src/record/arrow.rs | 96 +++++++++++++++++++++++-------------
5 files changed, 88 insertions(+), 36 deletions(-)
diff --git a/crates/examples/Cargo.toml b/crates/examples/Cargo.toml
index 26251cc..b918739 100644
--- a/crates/examples/Cargo.toml
+++ b/crates/examples/Cargo.toml
@@ -27,6 +27,10 @@ version = { workspace = true }
fluss = { workspace = true, features = ["storage-all"] }
tokio = { workspace = true }
clap = { workspace = true }
+
+[target.'cfg(not(target_env = "msvc"))'.dependencies]
+tikv-jemallocator = "0.6"
+
[[example]]
name = "example-table"
path = "src/example_table.rs"
diff --git a/crates/examples/src/example_table.rs
b/crates/examples/src/example_table.rs
index e4ad1fb..49f0ab4 100644
--- a/crates/examples/src/example_table.rs
+++ b/crates/examples/src/example_table.rs
@@ -15,6 +15,10 @@
// specific language governing permissions and limitations
// under the License.
+#[cfg(not(target_env = "msvc"))]
+#[global_allocator]
+static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+
mod example_kv_table;
mod example_partitioned_kv_table;
diff --git a/crates/fluss/Cargo.toml b/crates/fluss/Cargo.toml
index db1348a..c0ba6f8 100644
--- a/crates/fluss/Cargo.toml
+++ b/crates/fluss/Cargo.toml
@@ -80,6 +80,5 @@ jiff = { workspace = true, features = ["js"] }
testcontainers = "0.25.0"
test-env-helpers = "0.2.2"
-
[build-dependencies]
prost-build = { version = "0.14" }
diff --git a/crates/fluss/src/lib.rs b/crates/fluss/src/lib.rs
index cd060c8..13e8598 100644
--- a/crates/fluss/src/lib.rs
+++ b/crates/fluss/src/lib.rs
@@ -101,6 +101,25 @@
//! Ok(())
//! }
//! ```
+//!
+//! # Performance
+//!
+//! For production deployments on Linux, we recommend using
+//! [jemalloc](https://crates.io/crates/tikv-jemallocator) as the global
allocator.
+//! The default glibc allocator (ptmalloc2) can cause RSS bloat and
fragmentation under
+//! sustained write loads due to repeated same-size alloc/free cycles in Arrow
batch building.
+//! jemalloc's thread-local size-class bins handle this pattern efficiently.
+//!
+//! ```toml
+//! [target.'cfg(not(target_env = "msvc"))'.dependencies]
+//! tikv-jemallocator = "0.6"
+//! ```
+//!
+//! ```rust,ignore
+//! #[cfg(not(target_env = "msvc"))]
+//! #[global_allocator]
+//! static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
+//! ```
pub mod client;
pub mod metadata;
diff --git a/crates/fluss/src/record/arrow.rs b/crates/fluss/src/record/arrow.rs
index ea27836..7fd1619 100644
--- a/crates/fluss/src/record/arrow.rs
+++ b/crates/fluss/src/record/arrow.rs
@@ -151,8 +151,13 @@ pub const NO_BATCH_SEQUENCE: i32 = -1;
pub const BUILDER_DEFAULT_OFFSET: i64 = 0;
+// TODO: Switch to byte-size-based is_full() like Java's ArrowWriter instead
of a hard record cap.
pub const DEFAULT_MAX_RECORD: i32 = 256;
+/// Estimated average byte size for variable-width columns (Utf8, Binary).
+/// Used to pre-allocate data buffers and avoid reallocations during batch
building.
+const VARIABLE_WIDTH_AVG_BYTES: usize = 64;
+
pub struct MemoryLogRecordsArrowBuilder {
base_log_offset: i64,
schema_id: i32,
@@ -236,11 +241,12 @@ pub struct RowAppendRecordBatchBuilder {
impl RowAppendRecordBatchBuilder {
pub fn new(row_type: &RowType) -> Result<Self> {
+ let capacity = DEFAULT_MAX_RECORD as usize;
let schema_ref = to_arrow_schema(row_type)?;
let builders: Result<Vec<_>> = schema_ref
.fields()
.iter()
- .map(|field| Self::create_builder(field.data_type()))
+ .map(|field| Self::create_builder(field.data_type(), capacity))
.collect();
let field_getters = FieldGetter::create_field_getters(row_type);
Ok(Self {
@@ -251,26 +257,41 @@ impl RowAppendRecordBatchBuilder {
})
}
- fn create_builder(data_type: &arrow_schema::DataType) -> Result<Box<dyn
ArrayBuilder>> {
+ fn create_builder(
+ data_type: &arrow_schema::DataType,
+ capacity: usize,
+ ) -> Result<Box<dyn ArrayBuilder>> {
match data_type {
- arrow_schema::DataType::Int8 => Ok(Box::new(Int8Builder::new())),
- arrow_schema::DataType::Int16 => Ok(Box::new(Int16Builder::new())),
- arrow_schema::DataType::Int32 => Ok(Box::new(Int32Builder::new())),
- arrow_schema::DataType::Int64 => Ok(Box::new(Int64Builder::new())),
- arrow_schema::DataType::UInt8 => Ok(Box::new(UInt8Builder::new())),
- arrow_schema::DataType::UInt16 =>
Ok(Box::new(UInt16Builder::new())),
- arrow_schema::DataType::UInt32 =>
Ok(Box::new(UInt32Builder::new())),
- arrow_schema::DataType::UInt64 =>
Ok(Box::new(UInt64Builder::new())),
- arrow_schema::DataType::Float32 =>
Ok(Box::new(Float32Builder::new())),
- arrow_schema::DataType::Float64 =>
Ok(Box::new(Float64Builder::new())),
- arrow_schema::DataType::Boolean =>
Ok(Box::new(BooleanBuilder::new())),
- arrow_schema::DataType::Utf8 => Ok(Box::new(StringBuilder::new())),
- arrow_schema::DataType::Binary =>
Ok(Box::new(BinaryBuilder::new())),
- arrow_schema::DataType::FixedSizeBinary(size) => {
- Ok(Box::new(FixedSizeBinaryBuilder::new(*size)))
+ arrow_schema::DataType::Int8 =>
Ok(Box::new(Int8Builder::with_capacity(capacity))),
+ arrow_schema::DataType::Int16 =>
Ok(Box::new(Int16Builder::with_capacity(capacity))),
+ arrow_schema::DataType::Int32 =>
Ok(Box::new(Int32Builder::with_capacity(capacity))),
+ arrow_schema::DataType::Int64 =>
Ok(Box::new(Int64Builder::with_capacity(capacity))),
+ arrow_schema::DataType::UInt8 =>
Ok(Box::new(UInt8Builder::with_capacity(capacity))),
+ arrow_schema::DataType::UInt16 =>
Ok(Box::new(UInt16Builder::with_capacity(capacity))),
+ arrow_schema::DataType::UInt32 =>
Ok(Box::new(UInt32Builder::with_capacity(capacity))),
+ arrow_schema::DataType::UInt64 =>
Ok(Box::new(UInt64Builder::with_capacity(capacity))),
+ arrow_schema::DataType::Float32 => {
+ Ok(Box::new(Float32Builder::with_capacity(capacity)))
+ }
+ arrow_schema::DataType::Float64 => {
+ Ok(Box::new(Float64Builder::with_capacity(capacity)))
+ }
+ arrow_schema::DataType::Boolean => {
+ Ok(Box::new(BooleanBuilder::with_capacity(capacity)))
}
+ arrow_schema::DataType::Utf8 =>
Ok(Box::new(StringBuilder::with_capacity(
+ capacity,
+ capacity * VARIABLE_WIDTH_AVG_BYTES,
+ ))),
+ arrow_schema::DataType::Binary =>
Ok(Box::new(BinaryBuilder::with_capacity(
+ capacity,
+ capacity * VARIABLE_WIDTH_AVG_BYTES,
+ ))),
+ arrow_schema::DataType::FixedSizeBinary(size) => Ok(Box::new(
+ FixedSizeBinaryBuilder::with_capacity(capacity, *size),
+ )),
arrow_schema::DataType::Decimal128(precision, scale) => {
- let builder = Decimal128Builder::new()
+ let builder = Decimal128Builder::with_capacity(capacity)
.with_precision_and_scale(*precision, *scale)
.map_err(|e| Error::IllegalArgument {
message: format!(
@@ -279,11 +300,13 @@ impl RowAppendRecordBatchBuilder {
})?;
Ok(Box::new(builder))
}
- arrow_schema::DataType::Date32 =>
Ok(Box::new(Date32Builder::new())),
+ arrow_schema::DataType::Date32 =>
Ok(Box::new(Date32Builder::with_capacity(capacity))),
arrow_schema::DataType::Time32(unit) => match unit {
- arrow_schema::TimeUnit::Second =>
Ok(Box::new(Time32SecondBuilder::new())),
+ arrow_schema::TimeUnit::Second => {
+ Ok(Box::new(Time32SecondBuilder::with_capacity(capacity)))
+ }
arrow_schema::TimeUnit::Millisecond => {
- Ok(Box::new(Time32MillisecondBuilder::new()))
+
Ok(Box::new(Time32MillisecondBuilder::with_capacity(capacity)))
}
_ => Err(Error::IllegalArgument {
message: format!(
@@ -293,9 +316,11 @@ impl RowAppendRecordBatchBuilder {
},
arrow_schema::DataType::Time64(unit) => match unit {
arrow_schema::TimeUnit::Microsecond => {
- Ok(Box::new(Time64MicrosecondBuilder::new()))
+
Ok(Box::new(Time64MicrosecondBuilder::with_capacity(capacity)))
+ }
+ arrow_schema::TimeUnit::Nanosecond => {
+
Ok(Box::new(Time64NanosecondBuilder::with_capacity(capacity)))
}
- arrow_schema::TimeUnit::Nanosecond =>
Ok(Box::new(Time64NanosecondBuilder::new())),
_ => Err(Error::IllegalArgument {
message: format!(
"Time64 only supports Microsecond and Nanosecond
units, got: {unit:?}"
@@ -303,17 +328,17 @@ impl RowAppendRecordBatchBuilder {
}),
},
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Second,
_) => {
- Ok(Box::new(TimestampSecondBuilder::new()))
- }
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => {
- Ok(Box::new(TimestampMillisecondBuilder::new()))
- }
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => {
- Ok(Box::new(TimestampMicrosecondBuilder::new()))
- }
-
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => {
- Ok(Box::new(TimestampNanosecondBuilder::new()))
+ Ok(Box::new(TimestampSecondBuilder::with_capacity(capacity)))
}
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Millisecond, _) => Ok(
+ Box::new(TimestampMillisecondBuilder::with_capacity(capacity)),
+ ),
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Microsecond, _) => Ok(
+ Box::new(TimestampMicrosecondBuilder::with_capacity(capacity)),
+ ),
+
arrow_schema::DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, _) => Ok(
+ Box::new(TimestampNanosecondBuilder::with_capacity(capacity)),
+ ),
dt => Err(Error::IllegalArgument {
message: format!("Unsupported data type: {dt:?}"),
}),
@@ -1701,7 +1726,8 @@ mod tests {
// Test valid builder creation with precision=10, scale=2
let mut builder =
-
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10,
2)).unwrap();
+
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(10, 2),
256)
+ .unwrap();
let decimal_builder = builder
.as_any_mut()
.downcast_mut::<Decimal128Builder>()
@@ -1712,7 +1738,7 @@ mod tests {
// Test error case: invalid precision/scale
let result =
-
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100,
50));
+
RowAppendRecordBatchBuilder::create_builder(&ArrowDataType::Decimal128(100,
50), 256);
assert!(result.is_err());
}