This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new b5e4290f8 perf: Optimize lpad/rpad to remove unnecessary memory
allocations per element (#2963)
b5e4290f8 is described below
commit b5e4290f82acaa01222e1f03a17bf7f1afc4f68a
Author: Andy Grove <[email protected]>
AuthorDate: Tue Dec 23 14:20:17 2025 -0700
perf: Optimize lpad/rpad to remove unnecessary memory allocations per
element (#2963)
---
native/spark-expr/Cargo.toml | 4 +
native/spark-expr/benches/padding.rs | 121 +++++++++++++++++++++
.../char_varchar_utils/read_side_padding.rs | 110 +++++++++++++------
3 files changed, 204 insertions(+), 31 deletions(-)
diff --git a/native/spark-expr/Cargo.toml b/native/spark-expr/Cargo.toml
index c973a5b37..ea89c4320 100644
--- a/native/spark-expr/Cargo.toml
+++ b/native/spark-expr/Cargo.toml
@@ -76,6 +76,10 @@ harness = false
name = "bloom_filter_agg"
harness = false
+[[bench]]
+name = "padding"
+harness = false
+
[[test]]
name = "test_udf_registration"
path = "tests/spark_expr_reg.rs"
diff --git a/native/spark-expr/benches/padding.rs
b/native/spark-expr/benches/padding.rs
new file mode 100644
index 000000000..cd9e28f2d
--- /dev/null
+++ b/native/spark-expr/benches/padding.rs
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::array::builder::StringBuilder;
+use arrow::array::ArrayRef;
+use criterion::{criterion_group, criterion_main, Criterion};
+use datafusion::common::ScalarValue;
+use datafusion::physical_plan::ColumnarValue;
+use datafusion_comet_spark_expr::{spark_lpad, spark_rpad};
+use std::hint::black_box;
+use std::sync::Arc;
+
+fn create_string_array(size: usize) -> ArrayRef {
+ let mut builder = StringBuilder::new();
+ for i in 0..size {
+ if i % 10 == 0 {
+ builder.append_null();
+ } else {
+ builder.append_value(format!("string{}", i % 100));
+ }
+ }
+ Arc::new(builder.finish())
+}
+
+fn criterion_benchmark(c: &mut Criterion) {
+ let size = 8192;
+ let string_array = create_string_array(size);
+
+ // lpad with default padding (space)
+ c.bench_function("spark_lpad: default padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ];
+ b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
+ });
+
+ // lpad with custom padding character
+ c.bench_function("spark_lpad: custom padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))),
+ ];
+ b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
+ });
+
+ // rpad with default padding (space)
+ c.bench_function("spark_rpad: default padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ];
+ b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
+ });
+
+ // rpad with custom padding character
+ c.bench_function("spark_rpad: custom padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some("*".to_string()))),
+ ];
+ b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
+ });
+
+ // lpad with multi-character padding string
+ c.bench_function("spark_lpad: multi-char padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))),
+ ];
+ b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
+ });
+
+ // rpad with multi-character padding string
+ c.bench_function("spark_rpad: multi-char padding", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(20))),
+ ColumnarValue::Scalar(ScalarValue::Utf8(Some("abc".to_string()))),
+ ];
+ b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
+ });
+
+ // lpad with truncation (target length shorter than some strings)
+ c.bench_function("spark_lpad: with truncation", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(5))),
+ ];
+ b.iter(|| black_box(spark_lpad(black_box(&args)).unwrap()))
+ });
+
+ // rpad with truncation (target length shorter than some strings)
+ c.bench_function("spark_rpad: with truncation", |b| {
+ let args = vec![
+ ColumnarValue::Array(Arc::clone(&string_array)),
+ ColumnarValue::Scalar(ScalarValue::Int32(Some(5))),
+ ];
+ b.iter(|| black_box(spark_rpad(black_box(&args)).unwrap()))
+ });
+}
+
+criterion_group!(benches, criterion_benchmark);
+criterion_main!(benches);
diff --git
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
index 89485ddec..000b4810e 100644
---
a/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
+++
b/native/spark-expr/src/static_invoke/char_varchar_utils/read_side_padding.rs
@@ -194,6 +194,10 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
is_left_pad: bool,
) -> Result<ColumnarValue, DataFusionError> {
let string_array = as_generic_string_array::<T>(array)?;
+
+ // Pre-compute pad characters once to avoid repeated iteration
+ let pad_chars: Vec<char> = pad_string.chars().collect();
+
match pad_type {
ColumnarValue::Array(array_int) => {
let int_pad_array = array_int.as_primitive::<Int32Type>();
@@ -203,18 +207,24 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
string_array.len() * int_pad_array.len(),
);
+ // Reusable buffer to avoid per-element allocations
+ let mut buffer = String::with_capacity(pad_chars.len());
+
for (string, length) in string_array.iter().zip(int_pad_array) {
let length = length.unwrap();
match string {
Some(string) => {
if length >= 0 {
- builder.append_value(add_padding_string(
- string.parse().unwrap(),
+ buffer.clear();
+ write_padded_string(
+ &mut buffer,
+ string,
length as usize,
truncate,
- pad_string,
+ &pad_chars,
is_left_pad,
- )?)
+ );
+ builder.append_value(&buffer);
} else {
builder.append_value("");
}
@@ -232,15 +242,23 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
string_array.len() * length,
);
+ // Reusable buffer to avoid per-element allocations
+ let mut buffer = String::with_capacity(length);
+
for string in string_array.iter() {
match string {
- Some(string) => builder.append_value(add_padding_string(
- string.parse().unwrap(),
- length,
- truncate,
- pad_string,
- is_left_pad,
- )?),
+ Some(string) => {
+ buffer.clear();
+ write_padded_string(
+ &mut buffer,
+ string,
+ length,
+ truncate,
+ &pad_chars,
+ is_left_pad,
+ );
+ builder.append_value(&buffer);
+ }
_ => builder.append_null(),
}
}
@@ -249,44 +267,74 @@ fn spark_read_side_padding_internal<T: OffsetSizeTrait>(
}
}
-fn add_padding_string(
- string: String,
+/// Writes a padded string to the provided buffer, avoiding allocations.
+///
+/// The buffer is assumed to be cleared before calling this function.
+/// Padding characters are written directly to the buffer without intermediate
allocations.
+#[inline]
+fn write_padded_string(
+ buffer: &mut String,
+ string: &str,
length: usize,
truncate: bool,
- pad_string: &str,
+ pad_chars: &[char],
is_left_pad: bool,
-) -> Result<String, DataFusionError> {
- // It looks Spark's UTF8String is closer to chars rather than graphemes
+) {
+ // Spark's UTF8String uses char count, not grapheme count
// https://stackoverflow.com/a/46290728
let char_len = string.chars().count();
+
if length <= char_len {
if truncate {
+ // Find byte index for the truncation point
let idx = string
.char_indices()
.nth(length)
.map(|(i, _)| i)
.unwrap_or(string.len());
- match string[..idx].parse() {
- Ok(string) => Ok(string),
- Err(err) => Err(DataFusionError::Internal(format!(
- "Failed adding padding string {} error {:}",
- string, err
- ))),
- }
+ buffer.push_str(&string[..idx]);
} else {
- Ok(string)
+ buffer.push_str(string);
}
} else {
let pad_needed = length - char_len;
- let pad: String =
pad_string.chars().cycle().take(pad_needed).collect();
- let mut result = String::with_capacity(string.len() + pad.len());
+
if is_left_pad {
- result.push_str(&pad);
- result.push_str(&string);
+ // Write padding first, then string
+ write_padding_chars(buffer, pad_chars, pad_needed);
+ buffer.push_str(string);
} else {
- result.push_str(&string);
- result.push_str(&pad);
+ // Write string first, then padding
+ buffer.push_str(string);
+ write_padding_chars(buffer, pad_chars, pad_needed);
+ }
+ }
+}
+
+/// Writes `count` characters from the cycling pad pattern directly to the
buffer.
+#[inline]
+fn write_padding_chars(buffer: &mut String, pad_chars: &[char], count: usize) {
+ if pad_chars.is_empty() {
+ return;
+ }
+
+ // Optimize for the common single-character padding case
+ if pad_chars.len() == 1 {
+ let ch = pad_chars[0];
+ for _ in 0..count {
+ buffer.push(ch);
+ }
+ } else {
+ // Multi-character padding: cycle through pad_chars
+ let mut remaining = count;
+ while remaining > 0 {
+ for &ch in pad_chars {
+ if remaining == 0 {
+ break;
+ }
+ buffer.push(ch);
+ remaining -= 1;
+ }
}
- Ok(result)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]