This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c46d7fa10 feat: support block gzip for streams (#9175)
0c46d7fa10 is described below
commit 0c46d7fa105fddc4a35a4c99e4aa2a063d967abb
Author: Trent Hauck <[email protected]>
AuthorDate: Wed Feb 14 03:13:55 2024 -0800
feat: support block gzip for streams (#9175)
* feat: support block gzip for streams
* test: add bgzip test
* style: remove unused imports
---
.../file_format/file_compression_type.rs | 54 ++++++++++++++++++++--
1 file changed, 51 insertions(+), 3 deletions(-)
diff --git
a/datafusion/core/src/datasource/file_format/file_compression_type.rs
b/datafusion/core/src/datasource/file_format/file_compression_type.rs
index 3dac7c2930..48094eede8 100644
--- a/datafusion/core/src/datasource/file_format/file_compression_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs
@@ -172,9 +172,14 @@ impl FileCompressionType {
) -> Result<BoxStream<'static, Result<Bytes>>> {
Ok(match self.variant {
#[cfg(feature = "compression")]
- GZIP =>
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
- .map_err(DataFusionError::from)
- .boxed(),
+ GZIP => {
+ let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
+ decoder.multiple_members(true);
+
+ ReaderStream::new(decoder)
+ .map_err(DataFusionError::from)
+ .boxed()
+ }
#[cfg(feature = "compression")]
BZIP2 =>
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
.map_err(DataFusionError::from)
@@ -260,7 +265,9 @@ mod tests {
FileCompressionType, FileTypeExt,
};
use crate::error::DataFusionError;
+ use bytes::Bytes;
use datafusion_common::file_options::file_type::FileType;
+ use futures::StreamExt;
use std::str::FromStr;
#[test]
@@ -340,4 +347,45 @@ mod tests {
Err(DataFusionError::NotImplemented(_))
));
}
+
+ #[tokio::test]
+ async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
+ // As described in https://samtools.github.io/hts-specs/SAMv1.pdf
("The BGZF compression format")
+
+ // Ignore rust formatting so the byte array is easier to read
+ #[rustfmt::skip]
+ let data = [
+ // Block header
+ 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06,
0x00, 0x42, 0x43,
+ 0x02, 0x00,
+ // Block 0, literal: 42
+ 0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1,
0x03, 0x00, 0x00, 0x00,
+ // Block header
+ 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06,
0x00, 0x42, 0x43,
+ 0x02, 0x00,
+ // Block 1, literal: 42
+ 0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1,
0x03, 0x00, 0x00, 0x00,
+ // EOF
+ 0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06,
0x00, 0x42, 0x43,
+ 0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00,
+ ];
+
+ // Create a byte stream
+ let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
+ Bytes::from(data.to_vec()),
+ )]);
+ let converted_stream =
+ FileCompressionType::GZIP.convert_stream(stream.boxed())?;
+
+ let vec = converted_stream
+ .map(|r| r.unwrap())
+ .collect::<Vec<Bytes>>()
+ .await;
+
+ let string_value = String::from_utf8_lossy(&vec[0]);
+
+ assert_eq!(string_value, "42\n42\n");
+
+ Ok(())
+ }
}