This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 620e39dea9 fix: lazy-init zstd compression contexts to avoid 
unnecessary FFI calls (#9808)
620e39dea9 is described below

commit 620e39dea9951f6e1e246a421a9e679fc00a263e
Author: Matt Butrovich <[email protected]>
AuthorDate: Fri Apr 24 12:13:09 2026 -0400

    fix: lazy-init zstd compression contexts to avoid unnecessary FFI calls 
(#9808)
    
    # Which issue does this PR close?
    
    - Closes #9463.
    
    # Rationale for this change
    
    `CompressionContext::default()` and `DecompressionContext::default()`
    eagerly initialize zstd compressor/decompressor via FFI even when no
    compression is selected. This causes Miri failures and unnecessary
    overhead for users who never use zstd compression.
    
    # What changes are included in this PR?
    
    Wrap the zstd compressor/decompressor fields in `Option` and
    lazy-initialize them on first use via `get_or_insert_with`. `Default`
    now sets these to `None`, avoiding the FFI call to `ZSTD_createCCtx()` /
    `ZSTD_createDCtx()` until zstd compression is actually requested.
    
    # Are these changes tested?
    
    Covered by existing `test_zstd_compression` and `test_lz4_compression`
    tests in `compression.rs`, plus IPC round-trip tests.
    
    # Are there any user-facing changes?
    
    No. `CompressionContext` and `DecompressionContext` are public but
    opaque structs with no public fields or methods. Behavior is unchanged.
---
 arrow-ipc/src/compression.rs | 48 +++++++++++++++++++++++++++++++++-----------
 1 file changed, 36 insertions(+), 12 deletions(-)

diff --git a/arrow-ipc/src/compression.rs b/arrow-ipc/src/compression.rs
index 1b7c84d9f0..f5327fa74f 100644
--- a/arrow-ipc/src/compression.rs
+++ b/arrow-ipc/src/compression.rs
@@ -29,29 +29,38 @@ const LENGTH_OF_PREFIX_DATA: i64 = 8;
 /// compression.
 pub struct CompressionContext {
     #[cfg(feature = "zstd")]
-    compressor: zstd::bulk::Compressor<'static>,
+    compressor: Option<zstd::bulk::Compressor<'static>>,
 }
 
-// the reason we allow derivable_impls here is because when zstd feature is 
not enabled, this
-// becomes derivable. however with zstd feature want to be explicit about the 
compression level.
 #[allow(clippy::derivable_impls)]
 impl Default for CompressionContext {
     fn default() -> Self {
         CompressionContext {
-            // safety: `new` here will only return error here if using an 
invalid compression level
             #[cfg(feature = "zstd")]
-            compressor: 
zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL)
-                .expect("can use default compression level"),
+            compressor: None,
         }
     }
 }
 
+impl CompressionContext {
+    #[cfg(feature = "zstd")]
+    fn zstd_compressor(&mut self) -> &mut zstd::bulk::Compressor<'static> {
+        self.compressor.get_or_insert_with(|| {
+            zstd::bulk::Compressor::new(zstd::DEFAULT_COMPRESSION_LEVEL)
+                .expect("can use default compression level")
+        })
+    }
+}
+
 impl std::fmt::Debug for CompressionContext {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         let mut ds = f.debug_struct("CompressionContext");
 
         #[cfg(feature = "zstd")]
-        ds.field("compressor", &"zstd::bulk::Compressor");
+        ds.field(
+            "compressor",
+            &self.compressor.as_ref().map(|_| "zstd::bulk::Compressor"),
+        );
 
         ds.finish()
     }
@@ -64,13 +73,20 @@ impl std::fmt::Debug for CompressionContext {
 /// context for every decompression.
 pub struct DecompressionContext {
     #[cfg(feature = "zstd")]
-    decompressor: zstd::bulk::Decompressor<'static>,
+    decompressor: Option<zstd::bulk::Decompressor<'static>>,
 }
 
 impl DecompressionContext {
     pub(crate) fn new() -> Self {
         Default::default()
     }
+
+    #[cfg(feature = "zstd")]
+    fn zstd_decompressor(&mut self) -> &mut zstd::bulk::Decompressor<'static> {
+        self.decompressor.get_or_insert_with(|| {
+            zstd::bulk::Decompressor::new().expect("can create zstd 
decompressor")
+        })
+    }
 }
 
 #[allow(clippy::derivable_impls)]
@@ -78,7 +94,7 @@ impl Default for DecompressionContext {
     fn default() -> Self {
         DecompressionContext {
             #[cfg(feature = "zstd")]
-            decompressor: zstd::bulk::Decompressor::new().expect("can create 
zstd decompressor"),
+            decompressor: None,
         }
     }
 }
@@ -88,7 +104,13 @@ impl std::fmt::Debug for DecompressionContext {
         let mut ds = f.debug_struct("DecompressionContext");
 
         #[cfg(feature = "zstd")]
-        ds.field("decompressor", &"zstd::bulk::Decompressor");
+        ds.field(
+            "decompressor",
+            &self
+                .decompressor
+                .as_ref()
+                .map(|_| "zstd::bulk::Decompressor"),
+        );
 
         ds.finish()
     }
@@ -267,7 +289,7 @@ fn compress_zstd(
     output: &mut Vec<u8>,
     context: &mut CompressionContext,
 ) -> Result<(), ArrowError> {
-    let result = context.compressor.compress(input)?;
+    let result = context.zstd_compressor().compress(input)?;
     output.extend_from_slice(&result);
     Ok(())
 }
@@ -290,7 +312,9 @@ fn decompress_zstd(
     decompressed_size: usize,
     context: &mut DecompressionContext,
 ) -> Result<Vec<u8>, ArrowError> {
-    let output = context.decompressor.decompress(input, decompressed_size)?;
+    let output = context
+        .zstd_decompressor()
+        .decompress(input, decompressed_size)?;
     Ok(output)
 }
 

Reply via email to