This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b9dd6f0031 [arrow-flight] Optimize flight, remove some allocations, 
add dictionary focused benchmarks (#10126)
b9dd6f0031 is described below

commit b9dd6f003164f3b8fad9936dcdb6b1da76c5283a
Author: RIchard Baah <[email protected]>
AuthorDate: Thu Jun 18 15:45:14 2026 -0400

    [arrow-flight] Optimize flight, remove some allocations, add dictionary 
focused benchmarks (#10126)
    
    # Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax.
    -->
    
    Part of
    
    - #10125
    
    # Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    Going through the arrow-flight codebase I noticed that by default
    `DictionaryHandling` is set to Hydrate. This means it expands the arrays
    out to their logical form. In other words when the variant is set to
    hydrate, `arrow-ipc::IpcDataGenerator::encode_all_dicts()` never
    actually runs.
    This is important due to the arrow-ipc work that @alamb , @JakeDern &
    myself have been working on. [Efforts are being made to
    
optimize](https://github.com/apache/arrow-rs/pull/10044#issuecomment-4675826329)
    arrow-ipc's use of dictionaries. This PR allows those chanages to be
    visible through arrow-flight benchmarks
    # What changes are included in this PR?
    This PR adds a benchmark for arrow-flight's `do_put` endpoint using
    dictionary arrays, measuring the latency difference between the two
    DictionaryHandling variants.
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    # Are these changes tested?
    changes are benchmarks
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    
    If this PR claims a performance improvement, please include evidence
    such as benchmark results.
    -->
    
    # Are there any user-facing changes?
    no
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    
    If there are any breaking changes to public APIs, please call them out.
    -->
---
 arrow-flight/benches/common/mod.rs | 10 +++----
 arrow-flight/benches/flight.rs     | 61 +++++++++++++++++++++++++++++++++++---
 arrow-flight/src/encode.rs         | 53 +++++++++++++++++----------------
 3 files changed, 89 insertions(+), 35 deletions(-)

diff --git a/arrow-flight/benches/common/mod.rs 
b/arrow-flight/benches/common/mod.rs
index a55e1dd2f7..b716d3f31f 100644
--- a/arrow-flight/benches/common/mod.rs
+++ b/arrow-flight/benches/common/mod.rs
@@ -38,12 +38,10 @@ use tonic::{
 
 pub type Builder = fn(usize) -> ArrayRef;
 
-pub const TYPES: &[(&str, Builder)] = &[
-    ("fixed", fixed),
-    ("nested", nested),
-    ("variable", variable),
-    ("dict", dict),
-];
+pub const TYPES: &[(&str, Builder)] =
+    &[("fixed", fixed), ("nested", nested), ("variable", variable)];
+
+pub const DICT_TYPES: &[(&str, Builder)] = &[("dict", dict)];
 
 fn fixed(n: usize) -> ArrayRef {
     Arc::new(Int64Array::from_iter_values(0..n as i64))
diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs
index 4841e9dd98..db03380bb0 100644
--- a/arrow-flight/benches/flight.rs
+++ b/arrow-flight/benches/flight.rs
@@ -16,16 +16,19 @@
 // under the License.
 
 use arrow_array::RecordBatch;
-use arrow_flight::{FlightClient, FlightData, encode::FlightDataEncoderBuilder};
+use arrow_flight::{
+    FlightClient, FlightData,
+    encode::{DictionaryHandling, FlightDataEncoderBuilder},
+};
 use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, 
criterion_main};
 use futures::TryStreamExt;
 use tonic::transport::Channel;
 
 mod common;
-use common::{TYPES, build_batch, start_server};
+use common::{DICT_TYPES, TYPES, build_batch, start_server};
 
 const ROWS: [usize; 2] = [8 * 1024, 64 * 1024];
-const COLS: [usize; 2] = [1, 8];
+const COLS: [usize; 3] = [1, 4, 8];
 
 fn bench_encode(c: &mut Criterion) {
     let rt = tokio::runtime::Runtime::new().unwrap();
@@ -83,5 +86,55 @@ fn bench_roundtrip(c: &mut Criterion) {
     }
 }
 
-criterion_group!(benches, bench_encode, bench_roundtrip);
+fn bench_do_put_dictionary(c: &mut Criterion) {
+    let rt = tokio::runtime::Runtime::new().unwrap();
+    let (channel, _) = rt.block_on(start_server());
+    let mut g = c.benchmark_group("do_put_dictionary");
+
+    for &(name, build) in DICT_TYPES {
+        for &rows in &ROWS {
+            for &cols in &COLS {
+                let batch = build_batch(name, rows, cols, build);
+                g.throughput(Throughput::Bytes(batch.get_array_memory_size() 
as u64));
+
+                for (label, handling) in [
+                    ("hydrate", DictionaryHandling::Hydrate),
+                    ("resend", DictionaryHandling::Resend),
+                ] {
+                    let frames: Vec<FlightData> = rt
+                        .block_on(
+                            FlightDataEncoderBuilder::new()
+                                .with_dictionary_handling(handling)
+                                
.build(futures::stream::iter([Ok(batch.clone())]))
+                                .try_collect(),
+                        )
+                        .unwrap();
+                    let id = BenchmarkId::new(format!("{name}/{label}"), 
format!("{rows}x{cols}"));
+                    g.bench_function(id, |b| {
+                        b.to_async(&rt).iter_batched(
+                            || (FlightClient::new(channel.clone()), 
frames.clone()),
+                            |(mut client, frames)| async move {
+                                client
+                                    
.do_put(futures::stream::iter(frames.into_iter().map(Ok)))
+                                    .await
+                                    .unwrap()
+                                    .try_collect::<Vec<_>>()
+                                    .await
+                                    .unwrap();
+                            },
+                            criterion::BatchSize::SmallInput,
+                        );
+                    });
+                }
+            }
+        }
+    }
+}
+
+criterion_group!(
+    benches,
+    bench_encode,
+    bench_roundtrip,
+    bench_do_put_dictionary
+);
 criterion_main!(benches);
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index f2d0d25586..6c17669956 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -329,6 +329,7 @@ impl FlightDataEncoder {
     }
 
     /// Place the `FlightData` in the queue to send
+    #[inline]
     fn queue_message(&mut self, mut data: FlightData) {
         if let Some(descriptor) = self.descriptor.take() {
             data.flight_descriptor = Some(descriptor);
@@ -336,13 +337,6 @@ impl FlightDataEncoder {
         self.queue.push_back(data);
     }
 
-    /// Place the `FlightData` in the queue to send
-    fn queue_messages(&mut self, datas: impl IntoIterator<Item = FlightData>) {
-        for data in datas {
-            self.queue_message(data)
-        }
-    }
-
     /// Encodes schema as a [`FlightData`] in self.queue.
     /// Updates `self.schema` and returns the new schema
     fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
@@ -381,8 +375,9 @@ impl FlightDataEncoder {
 
         for batch in split_batch_for_grpc_response(batch, 
self.max_flight_data_size) {
             let (flight_dictionaries, flight_batch) = 
self.encoder.encode_batch(&batch)?;
-
-            self.queue_messages(flight_dictionaries);
+            for dict in flight_dictionaries {
+                self.queue_message(dict);
+            }
             self.queue_message(flight_batch);
         }
 
@@ -671,7 +666,7 @@ fn prepare_schema_for_flight(
 fn split_batch_for_grpc_response(
     batch: RecordBatch,
     max_flight_data_size: usize,
-) -> Vec<RecordBatch> {
+) -> impl Iterator<Item = RecordBatch> {
     let size = batch
         .columns()
         .iter()
@@ -680,18 +675,20 @@ fn split_batch_for_grpc_response(
 
     let n_batches =
         (size / max_flight_data_size + usize::from(size % max_flight_data_size 
!= 0)).max(1);
-    let rows_per_batch = (batch.num_rows() / n_batches).max(1);
-    let mut out = Vec::with_capacity(n_batches + 1);
-
+    let num_rows = batch.num_rows();
+    let rows_per_batch = (num_rows / n_batches).max(1);
     let mut offset = 0;
-    while offset < batch.num_rows() {
-        let length = (rows_per_batch).min(batch.num_rows() - offset);
-        out.push(batch.slice(offset, length));
 
-        offset += length;
-    }
-
-    out
+    std::iter::from_fn(move || {
+        if offset < num_rows {
+            let length = rows_per_batch.min(num_rows - offset);
+            let slice = batch.slice(offset, length);
+            offset += length;
+            Some(slice)
+        } else {
+            None
+        }
+    })
 }
 
 /// The data needed to encode a stream of flight data, holding on to
@@ -724,7 +721,10 @@ impl FlightIpcEncoder {
 
     /// Convert a `RecordBatch` to a Vec of `FlightData` representing
     /// dictionaries and a `FlightData` representing the batch
-    fn encode_batch(&mut self, batch: &RecordBatch) -> 
Result<(Vec<FlightData>, FlightData)> {
+    fn encode_batch(
+        &mut self,
+        batch: &RecordBatch,
+    ) -> Result<(impl Iterator<Item = FlightData> + use<>, FlightData)> {
         let (encoded_dictionaries, encoded_batch) = self.data_gen.encode(
             batch,
             &mut self.dictionary_tracker,
@@ -732,7 +732,7 @@ impl FlightIpcEncoder {
             &mut self.compression_context,
         )?;
 
-        let flight_dictionaries = 
encoded_dictionaries.into_iter().map(Into::into).collect();
+        let flight_dictionaries = encoded_dictionaries.into_iter().map(|e| 
e.into());
         let flight_batch = encoded_batch.into();
 
         Ok((flight_dictionaries, flight_batch))
@@ -1858,7 +1858,8 @@ mod tests {
         let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as 
ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size);
+        let split: Vec<_> =
+            split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size).collect();
         assert_eq!(split.len(), 1);
         assert_eq!(batch, split[0]);
 
@@ -1868,7 +1869,8 @@ mod tests {
         let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as 
u8).collect::<Vec<_>>());
         let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as 
ArrayRef)])
             .expect("cannot create record batch");
-        let split = split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size);
+        let split: Vec<_> =
+            split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size).collect();
         assert_eq!(split.len(), 3);
         assert_eq!(
             split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
@@ -1912,7 +1914,8 @@ mod tests {
 
         let input_rows = batch.num_rows();
 
-        let split = split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size_bytes);
+        let split: Vec<_> =
+            split_batch_for_grpc_response(batch.clone(), 
max_flight_data_size_bytes).collect();
         let sizes: Vec<_> = split.iter().map(RecordBatch::num_rows).collect();
         let output_rows: usize = sizes.iter().sum();
 

Reply via email to