This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4a9061597a Add arrow-flight test coverage for IPC compression (#10097)
4a9061597a is described below
commit 4a9061597a3c372a76e3ab3c38988ef1d0df2410
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Jun 18 15:37:30 2026 -0400
Add arrow-flight test coverage for IPC compression (#10097)
# Which issue does this PR close?
Follow-up while reviewing #10044.
# Rationale for this change
While reviewing #10044 (which reworks the IPC writer's buffer handling),
I found that the **compressed `IpcDataGenerator::encode` path is not
exercised by any test in the repository**.
# What changes are included in this PR?
This PR adds that missing coverage
# Are these changes tested?
This PR is test-only.
# Are there any user-facing changes?
No.
Co-authored-by: Claude Opus 4.8 (1M context) <[email protected]>
---
arrow-flight/Cargo.toml | 2 ++
arrow-flight/src/encode.rs | 36 +++++++++++++++++++++++++++++++++---
2 files changed, 35 insertions(+), 3 deletions(-)
diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml
index 8e399fbc5a..46fcd08103 100644
--- a/arrow-flight/Cargo.toml
+++ b/arrow-flight/Cargo.toml
@@ -75,6 +75,8 @@ cli = ["arrow-array/chrono-tz", "arrow-cast/prettyprint",
"tonic/tls-webpki-root
[dev-dependencies]
arrow-cast = { workspace = true, features = ["prettyprint"] }
+# Enable the IPC compression codecs so tests can exercise compressed Flight
encoding
+arrow-ipc = { workspace = true, features = ["lz4", "zstd"] }
assert_cmd = "2.0.8"
criterion = { workspace = true, default-features = false, features =
["async_tokio"] }
http = "1.1.0"
diff --git a/arrow-flight/src/encode.rs b/arrow-flight/src/encode.rs
index 29ac7e9574..f2d0d25586 100644
--- a/arrow-flight/src/encode.rs
+++ b/arrow-flight/src/encode.rs
@@ -793,7 +793,7 @@ mod tests {
use arrow_array::{cast::downcast_array, types::*};
use arrow_buffer::ScalarBuffer;
use arrow_cast::pretty::pretty_format_batches;
- use arrow_ipc::MetadataVersion;
+ use arrow_ipc::{CompressionType, MetadataVersion};
use arrow_schema::{UnionFields, UnionMode};
use builder::MapBuilder;
use std::collections::HashMap;
@@ -893,6 +893,27 @@ mod tests {
verify_flight_round_trip(vec![batch1, batch2]).await;
}
+ #[tokio::test]
+ async fn test_compression_round_trip() {
+ // Round trip a batch through Flight with IPC body compression
enabled. This exercises
+ // the compressed `IpcDataGenerator::encode` path (per-buffer codec
output), which the
+ // uncompressed Flight tests and the writer-based compression tests do
not cover.
+ let ints = Int32Array::from_iter_values((0..1024).map(|i| i % 8));
+ let strings = StringArray::from_iter_values((0..1024).map(|i|
format!("value-{}", i % 8)));
+ let batch = RecordBatch::try_from_iter(vec![
+ ("ints", Arc::new(ints) as ArrayRef),
+ ("strings", Arc::new(strings) as ArrayRef),
+ ])
+ .unwrap();
+
+ for compression in [CompressionType::LZ4_FRAME, CompressionType::ZSTD]
{
+ let options = IpcWriteOptions::default()
+ .try_with_compression(Some(compression))
+ .unwrap();
+ verify_flight_round_trip_with_options(vec![batch.clone()],
options).await;
+ }
+ }
+
#[tokio::test]
async fn test_dictionary_hydration_known_schema() {
let arr1: DictionaryArray<UInt16Type> = vec!["a", "a",
"b"].into_iter().collect();
@@ -1742,11 +1763,20 @@ mod tests {
verify_flight_round_trip(vec![batch1, batch2]).await;
}
- async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
+ async fn verify_flight_round_trip(batches: Vec<RecordBatch>) {
+ verify_flight_round_trip_with_options(batches,
IpcWriteOptions::default()).await;
+ }
+
+ /// Encode `batches` through a [`FlightDataEncoderBuilder`] using
`options`, decode them
+ /// again, and assert the decoded batches match the originals.
+ async fn verify_flight_round_trip_with_options(
+ mut batches: Vec<RecordBatch>,
+ options: IpcWriteOptions,
+ ) {
let expected_schema = batches.first().unwrap().schema();
let encoder = FlightDataEncoderBuilder::default()
- .with_options(IpcWriteOptions::default())
+ .with_options(options)
.with_dictionary_handling(DictionaryHandling::Resend)
.build(futures::stream::iter(batches.clone().into_iter().map(Ok)));