This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 88fb9234b3 Add `peek_next_page_offset` to `SerializedPageReader` 
(#6945)
88fb9234b3 is described below

commit 88fb9234b39461f44d6b648800c9f8831a7f9e01
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Thu Jan 9 16:34:10 2025 -0500

    Add `peek_next_page_offset` to `SerializedPageReader` (#6945)
    
    * add peek_next_page_offset
    
    * Update parquet/src/file/serialized_reader.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/src/file/serialized_reader.rs | 142 ++++++++++++++++++++++++++++++++++
 1 file changed, 142 insertions(+)

diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index a942481f7e..81ba0a6646 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
             physical_type: meta.column_type(),
         })
     }
+
+    /// Similar to `peek_next_page`, but returns the offset of the next page 
instead of the page metadata.
+    /// Unlike page metadata, an offset can uniquely identify a page.
+    ///
+    /// This is used when we need to read parquet with row-filter, and we 
don't want to decompress the page twice.
+    /// This function allows us to check if the next page is being cached or 
read previously.
+    #[cfg(test)]
+    fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
+        match &mut self.state {
+            SerializedPageReaderState::Values {
+                offset,
+                remaining_bytes,
+                next_page_header,
+            } => {
+                loop {
+                    if *remaining_bytes == 0 {
+                        return Ok(None);
+                    }
+                    return if let Some(header) = next_page_header.as_ref() {
+                        if let Ok(_page_meta) = 
PageMetadata::try_from(&**header) {
+                            Ok(Some(*offset))
+                        } else {
+                            // For unknown page type (e.g., INDEX_PAGE), skip 
and read next.
+                            *next_page_header = None;
+                            continue;
+                        }
+                    } else {
+                        let mut read = self.reader.get_read(*offset as u64)?;
+                        let (header_len, header) = read_page_header_len(&mut 
read)?;
+                        *offset += header_len;
+                        *remaining_bytes -= header_len;
+                        let page_meta = if let Ok(_page_meta) = 
PageMetadata::try_from(&header) {
+                            Ok(Some(*offset))
+                        } else {
+                            // For unknown page type (e.g., INDEX_PAGE), skip 
and read next.
+                            continue;
+                        };
+                        *next_page_header = Some(Box::new(header));
+                        page_meta
+                    };
+                }
+            }
+            SerializedPageReaderState::Pages {
+                page_locations,
+                dictionary_page,
+                ..
+            } => {
+                if let Some(page) = dictionary_page {
+                    Ok(Some(page.offset as usize))
+                } else if let Some(page) = page_locations.front() {
+                    Ok(Some(page.offset as usize))
+                } else {
+                    Ok(None)
+                }
+            }
+        }
+    }
 }
 
 impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
@@ -802,6 +859,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R> 
{
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashSet;
+
     use bytes::Buf;
 
     use crate::file::properties::{EnabledStatistics, WriterProperties};
@@ -1107,6 +1166,89 @@ mod tests {
         assert_eq!(page_count, 2);
     }
 
+    fn get_serialized_page_reader<R: ChunkReader>(
+        file_reader: &SerializedFileReader<R>,
+        row_group: usize,
+        column: usize,
+    ) -> Result<SerializedPageReader<R>> {
+        let row_group = {
+            let row_group_metadata = file_reader.metadata.row_group(row_group);
+            let props = Arc::clone(&file_reader.props);
+            let f = Arc::clone(&file_reader.chunk_reader);
+            SerializedRowGroupReader::new(
+                f,
+                row_group_metadata,
+                file_reader
+                    .metadata
+                    .offset_index()
+                    .map(|x| x[row_group].as_slice()),
+                props,
+            )?
+        };
+
+        let col = row_group.metadata.column(column);
+
+        let page_locations = row_group
+            .offset_index
+            .map(|x| x[column].page_locations.clone());
+
+        let props = Arc::clone(&row_group.props);
+        SerializedPageReader::new_with_properties(
+            Arc::clone(&row_group.chunk_reader),
+            col,
+            row_group.metadata.num_rows() as usize,
+            page_locations,
+            props,
+        )
+    }
+
+    #[test]
+    fn test_peek_next_page_offset_matches_actual() -> Result<()> {
+        let test_file = get_test_file("alltypes_plain.parquet");
+        let reader = SerializedFileReader::new(test_file)?;
+
+        let mut offset_set = HashSet::new();
+        let num_row_groups = reader.metadata.num_row_groups();
+        for row_group in 0..num_row_groups {
+            let num_columns = 
reader.metadata.row_group(row_group).num_columns();
+            for column in 0..num_columns {
+                let mut page_reader = get_serialized_page_reader(&reader, 
row_group, column)?;
+
+                while let Ok(Some(page_offset)) = 
page_reader.peek_next_page_offset() {
+                    match &page_reader.state {
+                        SerializedPageReaderState::Pages {
+                            page_locations,
+                            dictionary_page,
+                            ..
+                        } => {
+                            if let Some(page) = dictionary_page {
+                                assert_eq!(page.offset as usize, page_offset);
+                            } else if let Some(page) = page_locations.front() {
+                                assert_eq!(page.offset as usize, page_offset);
+                            } else {
+                                unreachable!()
+                            }
+                        }
+                        SerializedPageReaderState::Values {
+                            offset,
+                            next_page_header,
+                            ..
+                        } => {
+                            assert!(next_page_header.is_some());
+                            assert_eq!(*offset, page_offset);
+                        }
+                    }
+                    let page = page_reader.get_next_page()?;
+                    assert!(page.is_some());
+                    let newly_inserted = offset_set.insert(page_offset);
+                    assert!(newly_inserted);
+                }
+            }
+        }
+
+        Ok(())
+    }
+
     #[test]
     fn test_page_iterator() {
         let file = get_test_file("alltypes_plain.parquet");

Reply via email to