This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 620e39dea9 fix: lazy-init zstd compression contexts to avoid
unnecessary FFI calls (#9808)
620e39dea9 is described below
commit 620e39dea9951f6e1e246a421a9e679fc00a263e
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Apr 24 12:13:09 2026 -0400
fix: lazy-init zstd compression contexts to avoid unnecessary FFI calls
(#9808)
# Which issue does this PR close?
- Closes #9463.
# Rationale for this change
`CompressionContext::default()` and `DecompressionContext::default()`
eagerly initialize zstd compressor/decompressor via FFI even when no
compression is selected. This causes Miri failures and unnecessary
overhead for users who never use zstd compression.
# What changes are included in this PR?
Wrap the zstd compressor/decompressor fields in `Option` and
lazy-initialize them on first use via `get_or_insert_with`. `Default`
now sets these to `None`, avoiding the FFI call to `ZSTD_createCCtx()` /
`ZSTD_createDCtx()` until zstd compression is actually requested.
# Are these changes tested?
Covered by existing `test_zstd_compression` and `test_lz4_compression`
tests in `compression.rs`, plus IPC round-trip tests.
# Are there any user-facing changes?
No. `CompressionContext` and `DecompressionContext` are public but
opaque structs with no public fields or methods. Behavior is unchanged.
---
arrow-ipc/src/compression.rs | 48 +++++++++++++++++++++++++++++++++-----------
1 file changed, 36 insertions(+), 12 deletions(-)
diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index 1b7c84d9f0..f5327fa74f 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -29,29 +29,38 @@ const LENGTH_OF_PREFIX_DATA: i64 = 8;
/// compression.
pub struct CompressionContext {
#[cfg(feature = "zstd")]
- compressor: zstd::bulk::Compressor<'static>,
+ compressor: Option<zstd::bulk::Compressor<'static>>,
}
-// the reason we allow derivable_impls here is because when zstd feature is
not enabled, this
-// becomes derivable. however with zstd feature want to be explicit about the
compression level.
#[allow(clippy::derivable_impls)]
impl Default for CompressionContext {
fn default() -> Self {
CompressionContext {
- // safety: `new` here will only return error here if using an
invalid compression level
#[cfg(feature = "zstd")]
- compressor:
zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL)
- .expect("can use default compression level"),
+ compressor: None,
}
}
}
+impl CompressionContext {
+ #[cfg(feature = "zstd")]
+ fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
+ self.compressor.get_or_insert_with(|| {
+ zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL)
+ .expect("can use default compression level")
+ })
+ }
+}
+
impl std::fmt::Debug for CompressionContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("CompressionContext");
#[cfg(feature = "zstd")]
- ds.field("compressor", &"zstd::bulk::Compressor");
+ ds.field(
+ "compressor",
+ &self.compressor.as_ref().map(|_| "zstd::bulk::Compressor"),
+ );
ds.finish()
}
@@ -64,13 +73,20 @@ impl std::fmt::Debug for CompressionContext {
/// context for every decompression.
pub struct DecompressionContext {
#[cfg(feature = "zstd")]
- decompressor: zstd::bulk::Decompressor<'static>,
+ decompressor: Option<zstd::bulk::Decompressor<'static>>,
}
impl DecompressionContext {
pub(crate) fn new() -> Self {
Default::default()
}
+
+ #[cfg(feature = "zstd")]
+ fn zstd_decompressor(&mut self) -> &mut zstd::bulk::Decompressor<'static> {
+ self.decompressor.get_or_insert_with(|| {
+ zstd::bulk::Decompressor::new().expect("can create zstd
decompressor")
+ })
+ }
}
#[allow(clippy::derivable_impls)]
@@ -78,7 +94,7 @@ impl Default for DecompressionContext {
fn default() -> Self {
DecompressionContext {
#[cfg(feature = "zstd")]
- decompressor: zstd::bulk::Decompressor::new().expect("can create
zstd decompressor"),
+ decompressor: None,
}
}
}
@@ -88,7 +104,13 @@ impl std::fmt::Debug for DecompressionContext {
let mut ds = f.debug_struct("DecompressionContext");
#[cfg(feature = "zstd")]
- ds.field("decompressor", &"zstd::bulk::Decompressor");
+ ds.field(
+ "decompressor",
+ &self
+ .decompressor
+ .as_ref()
+ .map(|_| "zstd::bulk::Decompressor"),
+ );
ds.finish()
}
@@ -267,7 +289,7 @@ fn compress_zstd(
output: &mut Vec<u8>,
context: &mut CompressionContext,
) -> Result<(), ArrowError> {
- let result = context.compressor.compress(input)?;
+ let result = context.zstd_compressor().compress(input)?;
output.extend_from_slice(&result);
Ok(())
}
@@ -290,7 +312,9 @@ fn decompress_zstd(
decompressed_size: usize,
context: &mut DecompressionContext,
) -> Result<Vec<u8>, ArrowError> {
- let output = context.decompressor.decompress(input, decompressed_size)?;
+ let output = context
+ .zstd_decompressor()
+ .decompress(input, decompressed_size)?;
Ok(output)
}