This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch branch-1.11
in repository https://gitbox.apache.org/repos/asf/avro.git

commit cd53fadd0229eff796faf4ba7d970a45cd7baf34
Author: Martin Grigorov <[email protected]>
AuthorDate: Wed Jan 5 09:32:42 2022 +0200

    AVRO-3246 Rust: Add new codec: bzip2 (#1389)
    
    * AVRO-3246 Rust: Add new codec: bzip2
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    * AVRO-3246 Rust: Add new codec: bzip2
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    * AVRO-3246 Fix an error reported by Rust nightly
    
    https://github.com/apache/avro/runs/4081593652?check_suite_focus=true
    (cherry picked from commit 9da79b68f4e37d2910d48e42dd43e559770fd38b)
---
 lang/rust/Cargo.toml   |  2 ++
 lang/rust/README.md    | 10 ++++++++++
 lang/rust/src/codec.rs | 42 ++++++++++++++++++++++++++++++++++++++++++
 lang/rust/src/lib.rs   |  2 +-
 4 files changed, 55 insertions(+), 1 deletion(-)

diff --git a/lang/rust/Cargo.toml b/lang/rust/Cargo.toml
index 23ddc2a..4501bf0 100644
--- a/lang/rust/Cargo.toml
+++ b/lang/rust/Cargo.toml
@@ -31,6 +31,7 @@ documentation = "https://docs.rs/avro-rs";
 [features]
 snappy = ["crc", "snap"]
 zstandard = ["zstd"]
+bzip = ["bzip2"]
 
 [lib]
 path = "src/lib.rs"
@@ -51,6 +52,7 @@ harness = false
 
 [dependencies]
 byteorder = "1.4.3"
+bzip2 = { version = "0.4.3", optional = true }
 crc = { version = "1.8.1", optional = true }
 digest = "0.9"
 libflate = "1.1.1"
diff --git a/lang/rust/README.md b/lang/rust/README.md
index 9d0ef24..93950ca 100644
--- a/lang/rust/README.md
+++ b/lang/rust/README.md
@@ -79,6 +79,14 @@ version = "x.y"
 features = ["zstandard"]
 ```
 
+Or in case you want to leverage the **Bzip2** codec:
+
+```toml
+[dependencies.avro-rs]
+version = "x.y"
+features = ["bzip"]
+```
+
 ## Upgrading to a newer minor version
 
 The library is still in beta, so there might be backward-incompatible changes 
between minor
@@ -254,6 +262,8 @@ compressed block is followed by the 4-byte, big-endianCRC32 
checksum of the unco
 the block. You must enable the `snappy` feature to use this codec.
 * **Zstandard**: uses Facebook's [Zstandard](https://facebook.github.io/zstd/) 
compression library.
 You must enable the `zstandard` feature to use this codec.
+* **Bzip2**: uses [BZip2](https://sourceware.org/bzip2/) compression library.
+You must enable the `bzip` feature to use this codec.
 
 To specify a codec to use to compress data, just specify it while creating a 
`Writer`:
 ```rust
diff --git a/lang/rust/src/codec.rs b/lang/rust/src/codec.rs
index a27f058..84f0381 100644
--- a/lang/rust/src/codec.rs
+++ b/lang/rust/src/codec.rs
@@ -21,6 +21,12 @@ use libflate::deflate::{Decoder, Encoder};
 use std::io::{Read, Write};
 use strum_macros::{EnumString, IntoStaticStr};
 
+#[cfg(feature = "bzip")]
+use bzip2::{
+    read::{BzDecoder, BzEncoder},
+    Compression,
+};
+
 /// The compression codec used to compress blocks.
 #[derive(Clone, Copy, Debug, PartialEq, EnumString, IntoStaticStr)]
 #[strum(serialize_all = "kebab_case")]
@@ -38,6 +44,10 @@ pub enum Codec {
     Snappy,
     #[cfg(feature = "zstandard")]
     Zstd,
+    #[cfg(feature = "bzip")]
+    /// The `BZip2` codec uses [BZip2](https://sourceware.org/bzip2/)
+    /// compression library.
+    Bzip2,
 }
 
 impl From<Codec> for Value {
@@ -81,6 +91,13 @@ impl Codec {
                 encoder.write_all(stream).map_err(Error::ZstdCompress)?;
                 *stream = encoder.finish().unwrap();
             }
+            #[cfg(feature = "bzip")]
+            Codec::Bzip2 => {
+                let mut encoder = BzEncoder::new(&stream[..], 
Compression::best());
+                let mut buffer = Vec::new();
+                encoder.read_to_end(&mut buffer).unwrap();
+                *stream = buffer;
+            }
         };
 
         Ok(())
@@ -124,6 +141,13 @@ impl Codec {
                 std::io::copy(&mut decoder, &mut 
decoded).map_err(Error::ZstdDecompress)?;
                 decoded
             }
+            #[cfg(feature = "bzip")]
+            Codec::Bzip2 => {
+                let mut decoder = BzDecoder::new(&stream[..]);
+                let mut decoded = Vec::new();
+                decoder.read_to_end(&mut decoded).unwrap();
+                decoded
+            }
         };
         Ok(())
     }
@@ -180,6 +204,18 @@ mod tests {
         assert_eq!(INPUT, stream.as_slice());
     }
 
+    #[cfg(feature = "bzip")]
+    #[test]
+    fn bzip_compress_and_decompress() {
+        let codec = Codec::Bzip2;
+        let mut stream = INPUT.to_vec();
+        codec.compress(&mut stream).unwrap();
+        assert_ne!(INPUT, stream.as_slice());
+        assert!(INPUT.len() > stream.len());
+        codec.decompress(&mut stream).unwrap();
+        assert_eq!(INPUT, stream.as_slice());
+    }
+
     #[test]
     fn codec_to_str() {
         assert_eq!(<&str>::from(Codec::Null), "null");
@@ -190,6 +226,9 @@ mod tests {
 
         #[cfg(feature = "zstandard")]
         assert_eq!(<&str>::from(Codec::Zstd), "zstd");
+
+        #[cfg(feature = "bzip")]
+        assert_eq!(<&str>::from(Codec::Bzip2), "bzip2");
     }
 
     #[test]
@@ -205,6 +244,9 @@ mod tests {
         #[cfg(feature = "zstandard")]
         assert_eq!(Codec::from_str("zstd").unwrap(), Codec::Zstd);
 
+        #[cfg(feature = "bzip")]
+        assert_eq!(Codec::from_str("bzip2").unwrap(), Codec::Bzip2);
+
         assert!(Codec::from_str("not a codec").is_err());
     }
 }
diff --git a/lang/rust/src/lib.rs b/lang/rust/src/lib.rs
index ca14939..18d9230 100644
--- a/lang/rust/src/lib.rs
+++ b/lang/rust/src/lib.rs
@@ -744,7 +744,7 @@ pub use codec::Codec;
 pub use de::from_value;
 pub use decimal::Decimal;
 pub use duration::{Days, Duration, Millis, Months};
-pub use error::{Error, Error as DeError, Error as SerError};
+pub use error::Error;
 pub use reader::{from_avro_datum, Reader};
 pub use schema::Schema;
 pub use ser::to_value;

Reply via email to