This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c46d7fa10 feat: support block gzip for streams (#9175)
0c46d7fa10 is described below

commit 0c46d7fa105fddc4a35a4c99e4aa2a063d967abb
Author: Trent Hauck <[email protected]>
AuthorDate: Wed Feb 14 03:13:55 2024 -0800

    feat: support block gzip for streams (#9175)
    
    * feat: support block gzip for streams
    
    * test: add bgzip test
    
    * style: remove unused imports
---
 .../file_format/file_compression_type.rs           | 54 ++++++++++++++++++++--
 1 file changed, 51 insertions(+), 3 deletions(-)

diff --git 
a/datafusion/core/src/datasource/file_format/file_compression_type.rs 
b/datafusion/core/src/datasource/file_format/file_compression_type.rs
index 3dac7c2930..48094eede8 100644
--- a/datafusion/core/src/datasource/file_format/file_compression_type.rs
+++ b/datafusion/core/src/datasource/file_format/file_compression_type.rs
@@ -172,9 +172,14 @@ impl FileCompressionType {
     ) -> Result<BoxStream<'static, Result<Bytes>>> {
         Ok(match self.variant {
             #[cfg(feature = "compression")]
-            GZIP => 
ReaderStream::new(AsyncGzDecoder::new(StreamReader::new(s)))
-                .map_err(DataFusionError::from)
-                .boxed(),
+            GZIP => {
+                let mut decoder = AsyncGzDecoder::new(StreamReader::new(s));
+                decoder.multiple_members(true);
+
+                ReaderStream::new(decoder)
+                    .map_err(DataFusionError::from)
+                    .boxed()
+            }
             #[cfg(feature = "compression")]
             BZIP2 => 
ReaderStream::new(AsyncBzDecoder::new(StreamReader::new(s)))
                 .map_err(DataFusionError::from)
@@ -260,7 +265,9 @@ mod tests {
         FileCompressionType, FileTypeExt,
     };
     use crate::error::DataFusionError;
+    use bytes::Bytes;
     use datafusion_common::file_options::file_type::FileType;
+    use futures::StreamExt;
     use std::str::FromStr;
 
     #[test]
@@ -340,4 +347,45 @@ mod tests {
             Err(DataFusionError::NotImplemented(_))
         ));
     }
+
+    #[tokio::test]
+    async fn test_bgzip_stream_decoding() -> Result<(), DataFusionError> {
+        // As described in https://samtools.github.io/hts-specs/SAMv1.pdf 
("The BGZF compression format")
+
+        // Ignore rust formatting so the byte array is easier to read
+        #[rustfmt::skip]
+        let data = [
+            // Block header
+            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 
0x00, 0x42, 0x43,
+            0x02, 0x00,
+            // Block 0, literal: 42
+            0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 
0x03, 0x00, 0x00, 0x00,
+            // Block header
+            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 
0x00, 0x42, 0x43,
+            0x02, 0x00,
+            // Block 1, literal: 42
+            0x1e, 0x00, 0x33, 0x31, 0xe2, 0x02, 0x00, 0x31, 0x29, 0x86, 0xd1, 
0x03, 0x00, 0x00, 0x00,
+            // EOF
+            0x1f, 0x8b, 0x08, 0x04, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0x06, 
0x00, 0x42, 0x43,
+            0x02, 0x00, 0x1b, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 
0x00, 0x00, 0x00,
+        ];
+
+        // Create a byte stream
+        let stream = futures::stream::iter(vec![Ok::<Bytes, DataFusionError>(
+            Bytes::from(data.to_vec()),
+        )]);
+        let converted_stream =
+            FileCompressionType::GZIP.convert_stream(stream.boxed())?;
+
+        let vec = converted_stream
+            .map(|r| r.unwrap())
+            .collect::<Vec<Bytes>>()
+            .await;
+
+        let string_value = String::from_utf8_lossy(&vec[0]);
+
+        assert_eq!(string_value, "42\n42\n");
+
+        Ok(())
+    }
 }

Reply via email to