This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit 8aa071cd4135525e3ca550f0fc16623a1cf2750a
Author: sycured <[email protected]>
AuthorDate: Wed Jan 5 02:01:28 2022 -0500

    AVRO-3234: add new codec to lang/rust: zstandard (#1370)
    
    * AVRO-3234: add new codec zstandard
    
    * update MSRV to 1.51.0
    
    (cherry picked from commit 04e41fc2c8aa21e829f690e791e8aa2b0a049700)
---
 .github/workflows/test-lang-rust-ci.yml |  2 +-
 lang/rust/Cargo.toml                    |  2 ++
 lang/rust/README.md                     | 10 ++++++++++
 lang/rust/src/codec.rs                  | 33 +++++++++++++++++++++++++++++++++
 lang/rust/src/error.rs                  |  6 ++++++
 5 files changed, 52 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/test-lang-rust-ci.yml 
b/.github/workflows/test-lang-rust-ci.yml
index 977ea11..dacf461 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -39,7 +39,7 @@ jobs:
           - stable
           - beta
           - nightly
-          - 1.48.0  # MSRV
+          - 1.51.0  # MSRV
 
     steps:
       - name: Checkout
diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index d82e5ff..23ddc2a 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -30,6 +30,7 @@ documentation = "https://docs.rs/avro-rs";
 
 [features]
 snappy = ["crc", "snap"]
+zstandard = ["zstd"]
 
 [lib]
 path = "src/lib.rs"
@@ -66,6 +67,7 @@ typed-builder = "0.9.1"
 uuid = { version = "0.8.2", features = ["serde", "v4"] }
 zerocopy = "0.3.0"
 lazy_static = "1.1.1"
+zstd = { version = "0.9.0+zstd.1.5.0" , optional = true }
 
 [dev-dependencies]
 md-5 = "0.9.1"
diff --git a/lang/rust/README.md b/lang/rust/README.md
index e934cc3..9d0ef24 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -71,6 +71,14 @@ version = "x.y"
 features = ["snappy"]
 ```
 
+Or in case you want to leverage the **Zstandard** codec:
+
+```toml
+[dependencies.avro-rs]
+version = "x.y"
+features = ["zstandard"]
+```
+
 ## Upgrading to a newer minor version
 
 The library is still in beta, so there might be backward-incompatible changes 
between minor
@@ -244,6 +252,8 @@ RFC 1950) does not have a checksum.
 * **Snappy**: uses Google's [Snappy](http://google.github.io/snappy/) 
compression library. Each
 compressed block is followed by the 4-byte, big-endianCRC32 checksum of the 
uncompressed data in
 the block. You must enable the `snappy` feature to use this codec.
+* **Zstandard**: uses Facebook's [Zstandard](https://facebook.github.io/zstd/) 
compression library.
+You must enable the `zstandard` feature to use this codec.
 
 To specify a codec to use to compress data, just specify it while creating a 
`Writer`:
 ```rust
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index 25d395a..a27f058 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -36,6 +36,8 @@ pub enum Codec {
     /// compression library. Each compressed block is followed by the 4-byte, 
big-endian
     /// CRC32 checksum of the uncompressed data in the block.
     Snappy,
+    #[cfg(feature = "zstandard")]
+    Zstd,
 }
 
 impl From<Codec> for Value {
@@ -73,6 +75,12 @@ impl Codec {
 
                 *stream = encoded;
             }
+            #[cfg(feature = "zstandard")]
+            Codec::Zstd => {
+                let mut encoder = zstd::Encoder::new(Vec::new(), 0).unwrap();
+                encoder.write_all(stream).map_err(Error::ZstdCompress)?;
+                *stream = encoder.finish().unwrap();
+            }
         };
 
         Ok(())
@@ -109,6 +117,13 @@ impl Codec {
                 }
                 decoded
             }
+            #[cfg(feature = "zstandard")]
+            Codec::Zstd => {
+                let mut decoded = Vec::new();
+                let mut decoder = zstd::Decoder::new(&stream[..]).unwrap();
+                std::io::copy(&mut decoder, &mut 
decoded).map_err(Error::ZstdDecompress)?;
+                decoded
+            }
         };
         Ok(())
     }
@@ -153,6 +168,18 @@ mod tests {
         assert_eq!(INPUT, stream.as_slice());
     }
 
+    #[cfg(feature = "zstandard")]
+    #[test]
+    fn zstd_compress_and_decompress() {
+        let codec = Codec::Zstd;
+        let mut stream = INPUT.to_vec();
+        codec.compress(&mut stream).unwrap();
+        assert_ne!(INPUT, stream.as_slice());
+        assert!(INPUT.len() > stream.len());
+        codec.decompress(&mut stream).unwrap();
+        assert_eq!(INPUT, stream.as_slice());
+    }
+
     #[test]
     fn codec_to_str() {
         assert_eq!(<&str>::from(Codec::Null), "null");
@@ -160,6 +187,9 @@ mod tests {
 
         #[cfg(feature = "snappy")]
         assert_eq!(<&str>::from(Codec::Snappy), "snappy");
+
+        #[cfg(feature = "zstandard")]
+        assert_eq!(<&str>::from(Codec::Zstd), "zstd");
     }
 
     #[test]
@@ -172,6 +202,9 @@ mod tests {
         #[cfg(feature = "snappy")]
         assert_eq!(Codec::from_str("snappy").unwrap(), Codec::Snappy);
 
+        #[cfg(feature = "zstandard")]
+        assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
+
         assert!(Codec::from_str("not a codec").is_err());
     }
 }
diff --git a/lang/rust/src/error.rs b/lang/rust/src/error.rs
index 8f38f76..b690bfb 100644
--- a/lang/rust/src/error.rs
+++ b/lang/rust/src/error.rs
@@ -309,6 +309,12 @@ pub enum Error {
     #[error("Failed to decompress with snappy")]
     SnappyDecompress(#[source] snap::Error),
 
+    #[error("Failed to compress with zstd")]
+    ZstdCompress(#[source] std::io::Error),
+
+    #[error("Failed to decompress with zstd")]
+    ZstdDecompress(#[source] std::io::Error),
+
     #[error("Failed to read header")]
     ReadHeader(#[source] std::io::Error),
 

Reply via email to