This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a212cf4170 perf(arrow-ipc): Add writer benchmarks for dictionaries
(#10122)
a212cf4170 is described below
commit a212cf4170d7ccdf01c05ab7fb31697b899a4e1c
Author: Jake Dern <[email protected]>
AuthorDate: Wed Jun 17 07:54:01 2026 -0700
perf(arrow-ipc): Add writer benchmarks for dictionaries (#10122)
# Which issue does this PR close?
- Closes #10119
# Rationale for this change
This PR adds writer benchmarks for dictionaries so that we can measure
the performance impact of code changes on those code paths.
# What changes are included in this PR?
Three new benchmarks:
- StreamWriter benchmark for dictionaries
- StreamWriter benchmark for delta dictionaries
- FileWriter benchmark for delta dictionaries
# Are these changes tested?
Yes, just benchmarks included which I ran locally.
# Are there any user-facing changes?
No.
---
arrow-ipc/benches/ipc_writer.rs | 122 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 119 insertions(+), 3 deletions(-)
diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs
index eda7e3c58f..5050ff6cd1 100644
--- a/arrow-ipc/benches/ipc_writer.rs
+++ b/arrow-ipc/benches/ipc_writer.rs
@@ -15,10 +15,13 @@
// specific language governing permissions and limitations
// under the License.
-use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
-use arrow_array::{RecordBatch, builder::StringBuilder};
+use arrow_array::RecordBatch;
+use arrow_array::builder::{
+ Date32Builder, Decimal128Builder, Int32Builder, StringBuilder,
StringDictionaryBuilder,
+};
+use arrow_array::types::UInt32Type;
use arrow_ipc::CompressionType;
-use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
+use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions,
StreamWriter};
use arrow_schema::{DataType, Field, Schema};
use criterion::{Criterion, criterion_group, criterion_main};
use std::sync::Arc;
@@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
writer.finish().unwrap();
})
});
+
+ group.bench_function("StreamWriter/write_10/dict", |b| {
+ let batches = create_unique_dict_batches(10, 8192);
+ let schema = batches[0].schema();
+ let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+ b.iter(move || {
+ buffer.clear();
+ let mut writer = StreamWriter::try_new(&mut buffer,
schema.as_ref()).unwrap();
+ for batch in &batches {
+ writer.write(batch).unwrap();
+ }
+ writer.finish().unwrap();
+ })
+ });
+
+ group.bench_function("StreamWriter/write_10/dict/delta", |b| {
+ let batches = create_delta_dict_batches(10, 8192);
+ let schema = batches[0].schema();
+ let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+ let options =
+
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+ b.iter(move || {
+ buffer.clear();
+
+ let mut writer =
+ StreamWriter::try_new_with_options(&mut buffer,
schema.as_ref(), options.clone())
+ .unwrap();
+
+ for batch in &batches {
+ writer.write(batch).unwrap();
+ }
+
+ writer.finish().unwrap();
+ })
+ });
+
+ // The file writer rejects dictionary replacement, so only the delta case
is
+ // exercised here (growing dictionaries that are prefixes of one another).
+ group.bench_function("FileWriter/write_10/dict/delta", |b| {
+ let batches = create_delta_dict_batches(10, 8192);
+ let schema = batches[0].schema();
+ let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+ let options =
+
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+ b.iter(move || {
+ buffer.clear();
+
+ let mut writer =
+ FileWriter::try_new_with_options(&mut buffer, schema.as_ref(),
options.clone())
+ .unwrap();
+
+ for batch in &batches {
+ writer.write(batch).unwrap();
+ }
+
+ writer.finish().unwrap();
+ })
+ });
+}
+
+/// Build `n` record batches with a single dictionary column whose dictionary
+/// grows across batches. A single builder is reused with
`finish_preserve_values`
+/// so each batch's dictionary has the previous batch's as a prefix which
allows
+/// us to emit deltas.
+fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "d0",
+ DataType::Dictionary(Box::new(DataType::UInt32),
Box::new(DataType::Utf8)),
+ false,
+ )]));
+ let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+
+ let mut batches = Vec::with_capacity(n);
+ for i in 0..n {
+ // 3/4 of the rows reuse values shared by every batch, the other 1/4
+ // introduce values unique to this batch which extends the dictionary.
+ for r in 0..num_rows {
+ if r < num_rows / 4 {
+ builder.append_value(format!("batch {i} value {}", r));
+ } else {
+ builder.append_value(format!("shared {r}"));
+ }
+ }
+
+ // Preserve the values builder so the dictionary accumulates across
batches.
+ let dict = builder.finish_preserve_values();
+ batches.push(RecordBatch::try_new(schema.clone(),
vec![Arc::new(dict)]).unwrap());
+ }
+
+ batches
+}
+
+/// Build `n` record batches each with a completely distinct dictionary for
each batch.
+fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "d0",
+ DataType::Dictionary(Box::new(DataType::UInt32),
Box::new(DataType::Utf8)),
+ false,
+ )]));
+
+ let mut batches = Vec::with_capacity(n);
+ for i in 0..n {
+ let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+ for r in 0..num_rows {
+ builder.append_value(format!("batch {i} value {}", r % (num_rows /
2)));
+ }
+ let dict = builder.finish();
+ batches.push(RecordBatch::try_new(schema.clone(),
vec![Arc::new(dict)]).unwrap());
+ }
+
+ batches
}
fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {