adriangb commented on code in PR #6000:
URL: https://github.com/apache/arrow-rs/pull/6000#discussion_r1687212042


##########
parquet/src/file/writer.rs:
##########
@@ -1854,6 +2053,163 @@ mod tests {
         assert!(matches!(b_idx, Index::NONE), "{b_idx:?}");
     }
 
+    struct TestMetadata {
+        file_size: usize,
+        metadata: ParquetMetaData,
+    }
+
+    fn get_test_metadata(write_page_index: bool, read_page_index: bool) -> 
TestMetadata {
+        let mut buf = BytesMut::new().writer();
+        let schema: Arc<Schema> = Arc::new(Schema::new(vec![Field::new(
+            "a",
+            ArrowDataType::Int32,
+            true,
+        )]));
+
+        let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(2)]));
+
+        let batch = RecordBatch::try_from_iter(vec![("a", a)]).unwrap();
+
+        let writer_props = match write_page_index {
+            true => WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Page)
+                .build(),
+            false => WriterProperties::builder()
+                .set_statistics_enabled(EnabledStatistics::Chunk)
+                .build(),
+        };
+
+        let mut writer = ArrowWriter::try_new(&mut buf, schema, 
Some(writer_props)).unwrap();
+        writer.write(&batch).unwrap();
+        writer.close().unwrap();
+
+        let data = buf.into_inner().freeze();
+
+        let reader_opts = match read_page_index {
+            true => ReadOptionsBuilder::new().with_page_index().build(),
+            false => ReadOptionsBuilder::new().build(),
+        };
+        let reader = SerializedFileReader::new_with_options(data.clone(), 
reader_opts).unwrap();
+        let metadata = reader.metadata().clone();
+        TestMetadata {
+            file_size: data.len(),
+            metadata,
+        }
+    }
+
+    fn has_page_index(metadata: &ParquetMetaData) -> bool {
+        match metadata.column_index() {
+            Some(column_index) => column_index
+                .iter()
+                .any(|rg_idx| rg_idx.iter().all(|col_idx| !matches!(col_idx, 
Index::NONE))),
+            None => false,
+        }
+    }
+
+    #[test]
+    fn test_roundtrip_parquet_metadata_without_page_index() {
+        // We currently don't have an ad-hoc ParquetMetadata loader that can 
load page indexes so
+        // we at least test round trip without them
+        let metadata = get_test_metadata(false, false);
+        assert!(!has_page_index(&metadata.metadata));
+
+        let mut buf = BytesMut::new().writer();
+        {
+            let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+            writer.finish().unwrap();
+        }
+
+        let data = buf.into_inner().freeze();
+
+        let decoded_metadata = parse_metadata(&data).unwrap();
+        assert!(!has_page_index(&metadata.metadata));
+
+        assert_eq!(metadata.metadata, decoded_metadata);
+    }
+
+    /// Temporary function so we can test loading metadata with page indexes
+    /// while we haven't fully figured out how to load it cleanly
+    #[cfg(feature = "async")]
+    async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> 
ParquetMetaData {
+        use std::ops::Range;
+        use bytes::Bytes;
+        use futures::future::BoxFuture;
+        use futures::FutureExt;
+        use crate::arrow::async_reader::{MetadataFetch, MetadataLoader};
+        use crate::errors::Result as ParquetResult;
+
+        /// Adapt a `Bytes` to a `MetadataFetch` implementation.
+        struct AsyncBytes {
+            data: Bytes,
+        }
+
+        impl AsyncBytes {
+            fn new(data: Bytes) -> Self {
+                Self { data }
+            }
+        }
+
+        impl MetadataFetch for AsyncBytes {
+            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                async move { Ok(self.data.slice(range.start..range.end)) 
}.boxed()
+            }
+        }
+
+        /// A `MetadataFetch` implementation that reads from a subset of the 
full data
+        /// while accepting ranges that address the full data.
+        struct MaskedBytes {
+            inner: Box<dyn MetadataFetch + Send>,
+            inner_range: Range<usize>,
+        }
+
+        impl MaskedBytes {
+            fn new(inner: Box<dyn MetadataFetch + Send>, inner_range: 
Range<usize>) -> Self {
+                Self { inner, inner_range }
+            }
+        }
+
+        impl MetadataFetch for &mut MaskedBytes {
+            fn fetch(&mut self, range: Range<usize>) -> BoxFuture<'_, 
ParquetResult<Bytes>> {
+                let inner_range = self.inner_range.clone();
+                println!("inner_range: {:?}", inner_range);
+                println!("range: {:?}", range);
+                assert!(inner_range.start <= range.start && inner_range.end >= 
range.end);
+                let range = range.start - self.inner_range.start..range.end - 
self.inner_range.start;
+                self.inner.fetch(range)
+            }
+        }
+
+        let metadata_length = data.len();
+        let mut reader = MaskedBytes::new(
+            Box::new(AsyncBytes::new(data)),
+            file_size - metadata_length..file_size,
+        );
+        let metadata = MetadataLoader::load(&mut reader, file_size, 
None).await.unwrap();
+        let loaded_metadata = metadata.finish();
+        let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata);
+        metadata.load_page_index(true, true).await.unwrap();
+        metadata.finish()
+    }
+
+    #[tokio::test]
+    #[cfg(feature = "async")]
+    async fn test_encode_parquet_metadata_with_page_index() {
+        // Create a ParquetMetadata with page index information
+        let metadata = get_test_metadata(true, true);
+        assert!(has_page_index(&metadata.metadata));
+
+        let mut buf = BytesMut::new().writer();
+        {
+            let mut writer = ParquetMetadataWriter::new(&mut buf, 
&metadata.metadata);
+            writer.finish().unwrap();
+        }
+
+        let data = buf.into_inner().freeze();
+
+        let decoded_metadata = load_metadata_from_bytes(metadata.file_size, 
data).await;
+
+        assert_eq!(metadata.metadata, decoded_metadata);
+

Review Comment:
   Thank you!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to