This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b9dd6f0031 [arrow-flight] Optimize flight, remove some allocations,
add dictionary focused benchmarks (#10126)
b9dd6f0031 is described below
commit b9dd6f003164f3b8fad9936dcdb6b1da76c5283a
Author: RIchard Baah <[email protected]>
AuthorDate: Thu Jun 18 15:45:14 2026 -0400
[arrow-flight] Optimize flight, remove some allocations, add dictionary
focused benchmarks (#10126)
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
-->
Part of
- #10125
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
Going through the arrow-flight codebase I noticed that by default
`DictionaryHandling` is set to Hydrate. This means it expands the arrays
out to their logical form. In other words when the variant is set to
hydrate, `arrow-ipc::IpcDataGenerator::encode_all_dicts()` never
actually runs.
This is important due to the arrow-ipc work that @alamb , @JakeDern &
myself have been working on. [Efforts are being made to
optimize](https://github.com/apache/arrow-rs/pull/10044#issuecomment-4675826329)
arrow-ipc's use of dictionaries. This PR allows those chanages to be
visible through arrow-flight benchmarks
# What changes are included in this PR?
This PR adds a benchmark for arrow-flight's `do_put` endpoint using
dictionary arrays, measuring the latency difference between the two
DictionaryHandling variants.
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
# Are these changes tested?
changes are benchmarks
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
If this PR claims a performance improvement, please include evidence
such as benchmark results.
-->
# Are there any user-facing changes?
no
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
If there are any breaking changes to public APIs, please call them out.
-->
---
arrow-flight/benches/common/mod.rs | 10 +++----
arrow-flight/benches/flight.rs | 61 +++++++++++++++++++++++++++++++++++---
arrow-flight/src/encode.rs | 53 +++++++++++++++++----------------
3 files changed, 89 insertions(+), 35 deletions(-)
diff --git a/arrow-flight/benches/common/mod.rs
b/arrow-flight/benches/common/mod.rs
index a55e1dd2f7..b716d3f31f 100644
--- a/arrow-flight/benches/common/mod.rs
+++ b/arrow-flight/benches/common/mod.rs
@@ -38,12 +38,10 @@ use tonic::{
pub type Builder = fn(usize) -> ArrayRef;
-pub const TYPES: &[(&str, Builder)] = &[
- ("fixed", fixed),
- ("nested", nested),
- ("variable", variable),
- ("dict", dict),
-];
+pub const TYPES: &[(&str, Builder)] =
+ &[("fixed", fixed), ("nested", nested), ("variable", variable)];
+
+pub const DICT_TYPES: &[(&str, Builder)] = &[("dict", dict)];
fn fixed(n: usize) -> ArrayRef {
Arc::new(Int64Array::from_iter_values(0..n as i64))
diff --git a/arrow-flight/benches/flight.rs b/arrow-flight/benches/flight.rs
index 4841e9dd98..db03380bb0 100644
--- a/arrow-flight/benches/flight.rs
+++ b/arrow-flight/benches/flight.rs
@@ -16,16 +16,19 @@
// under the License.
use arrow_array::RecordBatch;
-use arrow_flight::{FlightClient, FlightData, encode::FlightDataEncoderBuilder};
+use arrow_flight::{
+ FlightClient, FlightData,
+ encode::{DictionaryHandling, FlightDataEncoderBuilder},
+};
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group,
criterion_main};
use futures::TryStreamExt;
use tonic::transport::Channel;
mod common;
-use common::{TYPES, build_batch, start_server};
+use common::{DICT_TYPES, TYPES, build_batch, start_server};
const ROWS: [usize; 2] = [8 * 1024, 64 * 1024];
-const COLS: [usize; 2] = [1, 8];
+const COLS: [usize; 3] = [1, 4, 8];
fn bench_encode(c: &mut Criterion) {
let rt = tokio::runtime::Runtime::new().unwrap();
@@ -83,5 +86,55 @@ fn bench_roundtrip(c: &mut Criterion) {
}
}
-criterion_group!(benches, bench_encode, bench_roundtrip);
+fn bench_do_put_dictionary(c: &mut Criterion) {
+ let rt = tokio::runtime::Runtime::new().unwrap();
+ let (channel, _) = rt.block_on(start_server());
+ let mut g = c.benchmark_group("do_put_dictionary");
+
+ for &(name, build) in DICT_TYPES {
+ for &rows in &ROWS {
+ for &cols in &COLS {
+ let batch = build_batch(name, rows, cols, build);
+ g.throughput(Throughput::Bytes(batch.get_array_memory_size()
as u64));
+
+ for (label, handling) in [
+ ("hydrate", DictionaryHandling::Hydrate),
+ ("resend", DictionaryHandling::Resend),
+ ] {
+ let frames: Vec<FlightData> = rt
+ .block_on(
+ FlightDataEncoderBuilder::new()
+ .with_dictionary_handling(handling)
+
.build(futures::stream::iter([Ok(batch.clone())]))
+ .try_collect(),
+ )
+ .unwrap();
+ let id = BenchmarkId::new(format!("{name}/{label}"),
format!("{rows}x{cols}"));
+ g.bench_function(id, |b| {
+ b.to_async(&rt).iter_batched(
+ || (FlightClient::new(channel.clone()),
frames.clone()),
+ |(mut client, frames)| async move {
+ client
+
.do_put(futures::stream::iter(frames.into_iter().map(Ok)))
+ .await
+ .unwrap()
+ .try_collect::<Vec<_>>()
+ .await
+ .unwrap();
+ },
+ criterion::BatchSize::SmallInput,
+ );
+ });
+ }
+ }
+ }
+ }
+}
+
+criterion_group!(
+ benches,
+ bench_encode,
+ bench_roundtrip,
+ bench_do_put_dictionary
+);
criterion_main!(benches);
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index f2d0d25586..6c17669956 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -329,6 +329,7 @@ impl FlightDataEncoder {
}
/// Place the `FlightData` in the queue to send
+ #[inline]
fn queue_message(&mut self, mut data: FlightData) {
if let Some(descriptor) = self.descriptor.take() {
data.flight_descriptor = Some(descriptor);
@@ -336,13 +337,6 @@ impl FlightDataEncoder {
self.queue.push_back(data);
}
- /// Place the `FlightData` in the queue to send
- fn queue_messages(&mut self, datas: impl IntoIterator<Item = FlightData>) {
- for data in datas {
- self.queue_message(data)
- }
- }
-
/// Encodes schema as a [`FlightData`] in self.queue.
/// Updates `self.schema` and returns the new schema
fn encode_schema(&mut self, schema: &SchemaRef) -> SchemaRef {
@@ -381,8 +375,9 @@ impl FlightDataEncoder {
for batch in split_batch_for_grpc_response(batch,
self.max_flight_data_size) {
let (flight_dictionaries, flight_batch) =
self.encoder.encode_batch(&batch)?;
-
- self.queue_messages(flight_dictionaries);
+ for dict in flight_dictionaries {
+ self.queue_message(dict);
+ }
self.queue_message(flight_batch);
}
@@ -671,7 +666,7 @@ fn prepare_schema_for_flight(
fn split_batch_for_grpc_response(
batch: RecordBatch,
max_flight_data_size: usize,
-) -> Vec<RecordBatch> {
+) -> impl Iterator<Item = RecordBatch> {
let size = batch
.columns()
.iter()
@@ -680,18 +675,20 @@ fn split_batch_for_grpc_response(
let n_batches =
(size / max_flight_data_size + usize::from(size % max_flight_data_size
!= 0)).max(1);
- let rows_per_batch = (batch.num_rows() / n_batches).max(1);
- let mut out = Vec::with_capacity(n_batches + 1);
-
+ let num_rows = batch.num_rows();
+ let rows_per_batch = (num_rows / n_batches).max(1);
let mut offset = 0;
- while offset < batch.num_rows() {
- let length = (rows_per_batch).min(batch.num_rows() - offset);
- out.push(batch.slice(offset, length));
- offset += length;
- }
-
- out
+ std::iter::from_fn(move || {
+ if offset < num_rows {
+ let length = rows_per_batch.min(num_rows - offset);
+ let slice = batch.slice(offset, length);
+ offset += length;
+ Some(slice)
+ } else {
+ None
+ }
+ })
}
/// The data needed to encode a stream of flight data, holding on to
@@ -724,7 +721,10 @@ impl FlightIpcEncoder {
/// Convert a `RecordBatch` to a Vec of `FlightData` representing
/// dictionaries and a `FlightData` representing the batch
- fn encode_batch(&mut self, batch: &RecordBatch) ->
Result<(Vec<FlightData>, FlightData)> {
+ fn encode_batch(
+ &mut self,
+ batch: &RecordBatch,
+ ) -> Result<(impl Iterator<Item = FlightData> + use<>, FlightData)> {
let (encoded_dictionaries, encoded_batch) = self.data_gen.encode(
batch,
&mut self.dictionary_tracker,
@@ -732,7 +732,7 @@ impl FlightIpcEncoder {
&mut self.compression_context,
)?;
- let flight_dictionaries =
encoded_dictionaries.into_iter().map(Into::into).collect();
+ let flight_dictionaries = encoded_dictionaries.into_iter().map(|e|
e.into());
let flight_batch = encoded_batch.into();
Ok((flight_dictionaries, flight_batch))
@@ -1858,7 +1858,8 @@ mod tests {
let c = UInt32Array::from(vec![1, 2, 3, 4, 5, 6]);
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as
ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(),
max_flight_data_size);
+ let split: Vec<_> =
+ split_batch_for_grpc_response(batch.clone(),
max_flight_data_size).collect();
assert_eq!(split.len(), 1);
assert_eq!(batch, split[0]);
@@ -1868,7 +1869,8 @@ mod tests {
let c = UInt8Array::from((0..n_rows).map(|i| (i % 256) as
u8).collect::<Vec<_>>());
let batch = RecordBatch::try_from_iter(vec![("a", Arc::new(c) as
ArrayRef)])
.expect("cannot create record batch");
- let split = split_batch_for_grpc_response(batch.clone(),
max_flight_data_size);
+ let split: Vec<_> =
+ split_batch_for_grpc_response(batch.clone(),
max_flight_data_size).collect();
assert_eq!(split.len(), 3);
assert_eq!(
split.iter().map(|batch| batch.num_rows()).sum::<usize>(),
@@ -1912,7 +1914,8 @@ mod tests {
let input_rows = batch.num_rows();
- let split = split_batch_for_grpc_response(batch.clone(),
max_flight_data_size_bytes);
+ let split: Vec<_> =
+ split_batch_for_grpc_response(batch.clone(),
max_flight_data_size_bytes).collect();
let sizes: Vec<_> = split.iter().map(RecordBatch::num_rows).collect();
let output_rows: usize = sizes.iter().sum();