This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 88fb9234b3 Add `peek_next_page_offset` to `SerializedPageReader`
(#6945)
88fb9234b3 is described below
commit 88fb9234b39461f44d6b648800c9f8831a7f9e01
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Thu Jan 9 16:34:10 2025 -0500
Add `peek_next_page_offset` to `SerializedPageReader` (#6945)
* add peek_next_page_offset
* Update parquet/src/file/serialized_reader.rs
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/file/serialized_reader.rs | 142 ++++++++++++++++++++++++++++++++++
1 file changed, 142 insertions(+)
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index a942481f7e..81ba0a6646 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -568,6 +568,63 @@ impl<R: ChunkReader> SerializedPageReader<R> {
physical_type: meta.column_type(),
})
}
+
+ /// Similar to `peek_next_page`, but returns the offset of the next page
instead of the page metadata.
+ /// Unlike page metadata, an offset can uniquely identify a page.
+ ///
+ /// This is used when we need to read parquet with row-filter, and we
don't want to decompress the page twice.
+ /// This function allows us to check if the next page is being cached or
read previously.
+ #[cfg(test)]
+ fn peek_next_page_offset(&mut self) -> Result<Option<usize>> {
+ match &mut self.state {
+ SerializedPageReaderState::Values {
+ offset,
+ remaining_bytes,
+ next_page_header,
+ } => {
+ loop {
+ if *remaining_bytes == 0 {
+ return Ok(None);
+ }
+ return if let Some(header) = next_page_header.as_ref() {
+ if let Ok(_page_meta) =
PageMetadata::try_from(&**header) {
+ Ok(Some(*offset))
+ } else {
+ // For unknown page type (e.g., INDEX_PAGE), skip
and read next.
+ *next_page_header = None;
+ continue;
+ }
+ } else {
+ let mut read = self.reader.get_read(*offset as u64)?;
+ let (header_len, header) = read_page_header_len(&mut
read)?;
+ *offset += header_len;
+ *remaining_bytes -= header_len;
+ let page_meta = if let Ok(_page_meta) =
PageMetadata::try_from(&header) {
+ Ok(Some(*offset))
+ } else {
+ // For unknown page type (e.g., INDEX_PAGE), skip
and read next.
+ continue;
+ };
+ *next_page_header = Some(Box::new(header));
+ page_meta
+ };
+ }
+ }
+ SerializedPageReaderState::Pages {
+ page_locations,
+ dictionary_page,
+ ..
+ } => {
+ if let Some(page) = dictionary_page {
+ Ok(Some(page.offset as usize))
+ } else if let Some(page) = page_locations.front() {
+ Ok(Some(page.offset as usize))
+ } else {
+ Ok(None)
+ }
+ }
+ }
+ }
}
impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
@@ -802,6 +859,8 @@ impl<R: ChunkReader> PageReader for SerializedPageReader<R>
{
#[cfg(test)]
mod tests {
+ use std::collections::HashSet;
+
use bytes::Buf;
use crate::file::properties::{EnabledStatistics, WriterProperties};
@@ -1107,6 +1166,89 @@ mod tests {
assert_eq!(page_count, 2);
}
+ fn get_serialized_page_reader<R: ChunkReader>(
+ file_reader: &SerializedFileReader<R>,
+ row_group: usize,
+ column: usize,
+ ) -> Result<SerializedPageReader<R>> {
+ let row_group = {
+ let row_group_metadata = file_reader.metadata.row_group(row_group);
+ let props = Arc::clone(&file_reader.props);
+ let f = Arc::clone(&file_reader.chunk_reader);
+ SerializedRowGroupReader::new(
+ f,
+ row_group_metadata,
+ file_reader
+ .metadata
+ .offset_index()
+ .map(|x| x[row_group].as_slice()),
+ props,
+ )?
+ };
+
+ let col = row_group.metadata.column(column);
+
+ let page_locations = row_group
+ .offset_index
+ .map(|x| x[column].page_locations.clone());
+
+ let props = Arc::clone(&row_group.props);
+ SerializedPageReader::new_with_properties(
+ Arc::clone(&row_group.chunk_reader),
+ col,
+ row_group.metadata.num_rows() as usize,
+ page_locations,
+ props,
+ )
+ }
+
+ #[test]
+ fn test_peek_next_page_offset_matches_actual() -> Result<()> {
+ let test_file = get_test_file("alltypes_plain.parquet");
+ let reader = SerializedFileReader::new(test_file)?;
+
+ let mut offset_set = HashSet::new();
+ let num_row_groups = reader.metadata.num_row_groups();
+ for row_group in 0..num_row_groups {
+ let num_columns =
reader.metadata.row_group(row_group).num_columns();
+ for column in 0..num_columns {
+ let mut page_reader = get_serialized_page_reader(&reader,
row_group, column)?;
+
+ while let Ok(Some(page_offset)) =
page_reader.peek_next_page_offset() {
+ match &page_reader.state {
+ SerializedPageReaderState::Pages {
+ page_locations,
+ dictionary_page,
+ ..
+ } => {
+ if let Some(page) = dictionary_page {
+ assert_eq!(page.offset as usize, page_offset);
+ } else if let Some(page) = page_locations.front() {
+ assert_eq!(page.offset as usize, page_offset);
+ } else {
+ unreachable!()
+ }
+ }
+ SerializedPageReaderState::Values {
+ offset,
+ next_page_header,
+ ..
+ } => {
+ assert!(next_page_header.is_some());
+ assert_eq!(*offset, page_offset);
+ }
+ }
+ let page = page_reader.get_next_page()?;
+ assert!(page.is_some());
+ let newly_inserted = offset_set.insert(page_offset);
+ assert!(newly_inserted);
+ }
+ }
+ }
+
+ Ok(())
+ }
+
#[test]
fn test_page_iterator() {
let file = get_test_file("alltypes_plain.parquet");