This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a212cf4170 perf(arrow-ipc): Add writer benchmarks for dictionaries 
(#10122)
a212cf4170 is described below

commit a212cf4170d7ccdf01c05ab7fb31697b899a4e1c
Author: Jake Dern <[email protected]>
AuthorDate: Wed Jun 17 07:54:01 2026 -0700

    perf(arrow-ipc): Add writer benchmarks for dictionaries (#10122)
    
    # Which issue does this PR close?
    
    - Closes #10119
    
    # Rationale for this change
    
    This PR adds writer benchmarks for dictionaries so that we can measure
    the performance impact of code changes on those code paths.
    
    # What changes are included in this PR?
    
    Three new benchmarks:
    
    - StreamWriter benchmark for dictionaries
    - StreamWriter benchmark for delta dictionaries
    - FileWriter benchmark for delta dictionaries
    
    # Are these changes tested?
    
    Yes, just benchmarks included which I ran locally.
    
    # Are there any user-facing changes?
    
    No.
---
 arrow-ipc/benches/ipc_writer.rs | 122 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 119 insertions(+), 3 deletions(-)

diff --git a/arrow-ipc/benches/ipc_writer.rs b/arrow-ipc/benches/ipc_writer.rs
index eda7e3c58f..5050ff6cd1 100644
--- a/arrow-ipc/benches/ipc_writer.rs
+++ b/arrow-ipc/benches/ipc_writer.rs
@@ -15,10 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_array::builder::{Date32Builder, Decimal128Builder, Int32Builder};
-use arrow_array::{RecordBatch, builder::StringBuilder};
+use arrow_array::RecordBatch;
+use arrow_array::builder::{
+    Date32Builder, Decimal128Builder, Int32Builder, StringBuilder, 
StringDictionaryBuilder,
+};
+use arrow_array::types::UInt32Type;
 use arrow_ipc::CompressionType;
-use arrow_ipc::writer::{FileWriter, IpcWriteOptions, StreamWriter};
+use arrow_ipc::writer::{DictionaryHandling, FileWriter, IpcWriteOptions, 
StreamWriter};
 use arrow_schema::{DataType, Field, Schema};
 use criterion::{Criterion, criterion_group, criterion_main};
 use std::sync::Arc;
@@ -69,6 +72,119 @@ fn criterion_benchmark(c: &mut Criterion) {
             writer.finish().unwrap();
         })
     });
+
+    group.bench_function("StreamWriter/write_10/dict", |b| {
+        let batches = create_unique_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        b.iter(move || {
+            buffer.clear();
+            let mut writer = StreamWriter::try_new(&mut buffer, 
schema.as_ref()).unwrap();
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+            writer.finish().unwrap();
+        })
+    });
+
+    group.bench_function("StreamWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                StreamWriter::try_new_with_options(&mut buffer, 
schema.as_ref(), options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+
+    // The file writer rejects dictionary replacement, so only the delta case 
is
+    // exercised here (growing dictionaries that are prefixes of one another).
+    group.bench_function("FileWriter/write_10/dict/delta", |b| {
+        let batches = create_delta_dict_batches(10, 8192);
+        let schema = batches[0].schema();
+        let mut buffer = Vec::with_capacity(2 * 1024 * 1024);
+        let options =
+            
IpcWriteOptions::default().with_dictionary_handling(DictionaryHandling::Delta);
+
+        b.iter(move || {
+            buffer.clear();
+
+            let mut writer =
+                FileWriter::try_new_with_options(&mut buffer, schema.as_ref(), 
options.clone())
+                    .unwrap();
+
+            for batch in &batches {
+                writer.write(batch).unwrap();
+            }
+
+            writer.finish().unwrap();
+        })
+    });
+}
+
+/// Build `n` record batches with a single dictionary column whose dictionary
+/// grows across batches. A single builder is reused with 
`finish_preserve_values`
+/// so each batch's dictionary has the previous batch's as a prefix which 
allows
+/// us to emit deltas.
+fn create_delta_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "d0",
+        DataType::Dictionary(Box::new(DataType::UInt32), 
Box::new(DataType::Utf8)),
+        false,
+    )]));
+    let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+
+    let mut batches = Vec::with_capacity(n);
+    for i in 0..n {
+        // 3/4 of the rows reuse values shared by every batch, the other 1/4
+        // introduce values unique to this batch which extends the dictionary.
+        for r in 0..num_rows {
+            if r < num_rows / 4 {
+                builder.append_value(format!("batch {i} value {}", r));
+            } else {
+                builder.append_value(format!("shared {r}"));
+            }
+        }
+
+        // Preserve the values builder so the dictionary accumulates across 
batches.
+        let dict = builder.finish_preserve_values();
+        batches.push(RecordBatch::try_new(schema.clone(), 
vec![Arc::new(dict)]).unwrap());
+    }
+
+    batches
+}
+
+/// Build `n` record batches each with a completely distinct dictionary for 
each batch.
+fn create_unique_dict_batches(n: usize, num_rows: usize) -> Vec<RecordBatch> {
+    let schema = Arc::new(Schema::new(vec![Field::new(
+        "d0",
+        DataType::Dictionary(Box::new(DataType::UInt32), 
Box::new(DataType::Utf8)),
+        false,
+    )]));
+
+    let mut batches = Vec::with_capacity(n);
+    for i in 0..n {
+        let mut builder = StringDictionaryBuilder::<UInt32Type>::new();
+        for r in 0..num_rows {
+            builder.append_value(format!("batch {i} value {}", r % (num_rows / 
2)));
+        }
+        let dict = builder.finish();
+        batches.push(RecordBatch::try_new(schema.clone(), 
vec![Arc::new(dict)]).unwrap());
+    }
+
+    batches
 }
 
 fn create_batch(num_rows: usize, allow_nulls: bool) -> RecordBatch {

Reply via email to