This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4e1247e8c Added support for LZ4_RAW compression. (#1604) (#2943)
4e1247e8c is described below
commit 4e1247e8c03f36940a912256e2d94f49a1b581df
Author: Adrián Gallego Castellanos <[email protected]>
AuthorDate: Thu Oct 27 22:59:25 2022 +0200
Added support for LZ4_RAW compression. (#1604) (#2943)
* Added support for LZ4_RAW compression. (#1604)
* This adds the implementation of LZ4_RAW codec by using lz4 block
compression algorithm. (#1604)
* This commit uses
https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
formula to estime the size of the uncompressed size. As it said in thread this
algorithm over-estimates the size, but it is probably the best we can get with
the current decompress API. As the size of a arrow LZ4_RAW block is not
prepended to the block.
* Other option would be to take the C++ approach to bypass the API
(https://github.com/apache/arrow/blob/master/cpp/src/arrow/util/compression_lz4.cc#L343).
This approach consists on relaying on the output_buffer capacity to guess the
uncompress_size. This works as `serialized_reader.rs` already knows the
uncompressed_size, as it reads it from the page header, and allocates the
output_buffer with a capacity equal to the uncompress_size
(https://github.com/marioloko/arrow-rs/blob/maste [...]
1. It is too hacky.
2. It will limit the use cases of the `decompress` API, as the caller
will need to know to allocate the right uncompressed_size.
3. It is not compatible with the current set of tests. However, new
test can be created.
* Clippy
* Add integration test
Co-authored-by: Adrián Gallego Castellanos <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
---
parquet/src/arrow/arrow_reader/mod.rs | 31 +++++++++++++++++
parquet/src/basic.rs | 4 +++
parquet/src/compression.rs | 64 +++++++++++++++++++++++++++++++++++
3 files changed, 99 insertions(+)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 51b09302c..7f68b07eb 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -2390,4 +2390,35 @@ mod tests {
assert_eq!(full.column(idx), projected.column(0));
}
}
+
+ #[test]
+ fn test_read_lz4_raw() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/lz4_raw_compressed.parquet", testdata);
+ let file = File::open(&path).unwrap();
+
+ let batches = ParquetRecordBatchReader::try_new(file, 1024)
+ .unwrap()
+ .collect::<ArrowResult<Vec<_>>>()
+ .unwrap();
+ assert_eq!(batches.len(), 1);
+ let batch = &batches[0];
+
+ assert_eq!(batch.num_columns(), 3);
+ assert_eq!(batch.num_rows(), 4);
+
+ // https://github.com/apache/parquet-testing/pull/18
+ let a: &Int64Array = batch.column(0).as_any().downcast_ref().unwrap();
+ assert_eq!(
+ a.values(),
+ &[1593604800, 1593604800, 1593604801, 1593604801]
+ );
+
+ let a: &BinaryArray = batch.column(1).as_any().downcast_ref().unwrap();
+ let a: Vec<_> = a.iter().flatten().collect();
+ assert_eq!(a, &[b"abc", b"def", b"abc", b"def"]);
+
+ let a: &Float64Array =
batch.column(2).as_any().downcast_ref().unwrap();
+ assert_eq!(a.values(), &[42.000000, 7.700000, 42.125000, 7.700000]);
+ }
}
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index b0f591c7a..96cdd537d 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -282,6 +282,7 @@ pub enum Encoding {
/// Supported compression algorithms.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+#[allow(non_camel_case_types)]
pub enum Compression {
UNCOMPRESSED,
SNAPPY,
@@ -290,6 +291,7 @@ pub enum Compression {
BROTLI,
LZ4,
ZSTD,
+ LZ4_RAW,
}
// ----------------------------------------------------------------------
@@ -826,6 +828,7 @@ impl TryFrom<parquet::CompressionCodec> for Compression {
parquet::CompressionCodec::BROTLI => Compression::BROTLI,
parquet::CompressionCodec::LZ4 => Compression::LZ4,
parquet::CompressionCodec::ZSTD => Compression::ZSTD,
+ parquet::CompressionCodec::LZ4_RAW => Compression::LZ4_RAW,
_ => {
return Err(general_err!(
"unexpected parquet compression codec: {}",
@@ -846,6 +849,7 @@ impl From<Compression> for parquet::CompressionCodec {
Compression::BROTLI => parquet::CompressionCodec::BROTLI,
Compression::LZ4 => parquet::CompressionCodec::LZ4,
Compression::ZSTD => parquet::CompressionCodec::ZSTD,
+ Compression::LZ4_RAW => parquet::CompressionCodec::LZ4_RAW,
}
}
}
diff --git a/parquet/src/compression.rs b/parquet/src/compression.rs
index ee5141cbe..f110e3d82 100644
--- a/parquet/src/compression.rs
+++ b/parquet/src/compression.rs
@@ -77,6 +77,8 @@ pub fn create_codec(codec: CodecType) ->
Result<Option<Box<dyn Codec>>> {
CodecType::LZ4 => Ok(Some(Box::new(LZ4Codec::new()))),
#[cfg(any(feature = "zstd", test))]
CodecType::ZSTD => Ok(Some(Box::new(ZSTDCodec::new()))),
+ #[cfg(any(feature = "lz4", test))]
+ CodecType::LZ4_RAW => Ok(Some(Box::new(LZ4RawCodec::new()))),
CodecType::UNCOMPRESSED => Ok(None),
_ => Err(nyi_err!("The codec type {} is not supported yet", codec)),
}
@@ -325,6 +327,63 @@ mod zstd_codec {
#[cfg(any(feature = "zstd", test))]
pub use zstd_codec::*;
+#[cfg(any(feature = "lz4", test))]
+mod lz4_raw_codec {
+ use crate::compression::Codec;
+ use crate::errors::Result;
+
+ /// Codec for LZ4 Raw compression algorithm.
+ pub struct LZ4RawCodec {}
+
+ impl LZ4RawCodec {
+ /// Creates new LZ4 Raw compression codec.
+ pub(crate) fn new() -> Self {
+ Self {}
+ }
+ }
+
+ // Compute max LZ4 uncompress size.
+ // Check
https://stackoverflow.com/questions/25740471/lz4-library-decompressed-data-upper-bound-size-estimation
+ fn max_uncompressed_size(compressed_size: usize) -> usize {
+ (compressed_size << 8) - compressed_size - 2526
+ }
+
+ impl Codec for LZ4RawCodec {
+ fn decompress(
+ &mut self,
+ input_buf: &[u8],
+ output_buf: &mut Vec<u8>,
+ ) -> Result<usize> {
+ let offset = output_buf.len();
+ let required_len = max_uncompressed_size(input_buf.len());
+ output_buf.resize(offset + required_len, 0);
+ let required_len: i32 = required_len.try_into().unwrap();
+ match lz4::block::decompress_to_buffer(input_buf,
Some(required_len), &mut output_buf[offset..]) {
+ Ok(n) => {
+ output_buf.truncate(offset + n);
+ Ok(n)
+ },
+ Err(e) => Err(e.into()),
+ }
+ }
+
+ fn compress(&mut self, input_buf: &[u8], output_buf: &mut Vec<u8>) ->
Result<()> {
+ let offset = output_buf.len();
+ let required_len = lz4::block::compress_bound(input_buf.len())?;
+ output_buf.resize(offset + required_len, 0);
+ match lz4::block::compress_to_buffer(input_buf, None, false, &mut
output_buf[offset..]) {
+ Ok(n) => {
+ output_buf.truncate(offset + n);
+ Ok(())
+ },
+ Err(e) => Err(e.into()),
+ }
+ }
+ }
+}
+#[cfg(any(feature = "lz4", test))]
+pub use lz4_raw_codec::*;
+
#[cfg(test)]
mod tests {
use super::*;
@@ -416,4 +475,9 @@ mod tests {
fn test_codec_zstd() {
test_codec(CodecType::ZSTD);
}
+
+ #[test]
+ fn test_codec_lz4_raw() {
+ test_codec(CodecType::LZ4_RAW);
+ }
}