This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 2185ce220 Use OffsetIndex to prune IO with RowSelection (#2473)
2185ce220 is described below

commit 2185ce22043953b3185001a04b46f50fbd6956d7
Author: Dan Harris <[email protected]>
AuthorDate: Wed Aug 17 09:38:33 2022 -0400

    Use OffsetIndex to prune IO with RowSelection (#2473)
    
    * Add struct for in-memory row group with only selected pages
    
    * Read only pages required for row selection
    
    * Remove InMemoryColumnChumk and prune IO for row selection
    
    * Review comments
    
    * Unignore test
    
    * Avoid copies
    
    * Fix docs
    
    * Linting
---
 parquet/src/arrow/arrow_reader/selection.rs | 147 +++++++-
 parquet/src/arrow/async_reader.rs           | 530 +++++++++++++---------------
 parquet/src/column/page.rs                  |   1 +
 parquet/src/file/page_index/index_reader.rs |   2 +-
 4 files changed, 384 insertions(+), 296 deletions(-)

diff --git a/parquet/src/arrow/arrow_reader/selection.rs 
b/parquet/src/arrow/arrow_reader/selection.rs
index 8e129f566..72867e891 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -17,12 +17,14 @@
 
 use arrow::array::{Array, BooleanArray};
 use arrow::compute::SlicesIterator;
+use parquet_format::PageLocation;
 use std::cmp::Ordering;
 use std::collections::VecDeque;
 use std::ops::Range;
 
-/// [`RowSelector`] represents a range of rows to scan from a parquet file
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
+/// scanning a parquet file
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
 pub struct RowSelector {
     /// The number of rows
     pub row_count: usize,
@@ -116,6 +118,57 @@ impl RowSelection {
         Self { selectors }
     }
 
+    /// Given an offset index, return the offset ranges for all data pages 
selected by `self`
+    pub(crate) fn scan_ranges(
+        &self,
+        page_locations: &[PageLocation],
+    ) -> Vec<Range<usize>> {
+        let mut ranges = vec![];
+        let mut row_offset = 0;
+
+        let mut pages = page_locations.iter().peekable();
+        let mut selectors = self.selectors.iter().cloned();
+        let mut current_selector = selectors.next();
+        let mut current_page = pages.next();
+
+        let mut current_page_included = false;
+
+        while let Some((selector, page)) = 
current_selector.as_mut().zip(current_page) {
+            if !(selector.skip || current_page_included) {
+                let start = page.offset as usize;
+                let end = start + page.compressed_page_size as usize;
+                ranges.push(start..end);
+                current_page_included = true;
+            }
+
+            if let Some(next_page) = pages.peek() {
+                if row_offset + selector.row_count > next_page.first_row_index 
as usize {
+                    let remaining_in_page =
+                        next_page.first_row_index as usize - row_offset;
+                    selector.row_count -= remaining_in_page;
+                    row_offset += remaining_in_page;
+                    current_page = pages.next();
+                    current_page_included = false;
+
+                    continue;
+                } else {
+                    if row_offset + selector.row_count
+                        == next_page.first_row_index as usize
+                    {
+                        current_page = pages.next();
+                        current_page_included = false;
+                    }
+                    row_offset += selector.row_count;
+                    current_selector = selectors.next();
+                }
+            } else {
+                break;
+            }
+        }
+
+        ranges
+    }
+
     /// Splits off the first `row_count` from this [`RowSelection`]
     pub fn split_off(&mut self, row_count: usize) -> Self {
         let mut total_count = 0;
@@ -162,7 +215,7 @@ impl RowSelection {
     /// self:     NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
     /// other:                YYYYYNNNNYYYYYYYYYYYYY   YYNNN
     ///
-    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
+    /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
     ///
     ///
     pub fn and_then(&self, other: &Self) -> Self {
@@ -423,4 +476,92 @@ mod tests {
             assert_eq!(a.and_then(&b), expected);
         }
     }
+
+    #[test]
+    fn test_scan_ranges() {
+        let index = vec![
+            PageLocation {
+                offset: 0,
+                compressed_page_size: 10,
+                first_row_index: 0,
+            },
+            PageLocation {
+                offset: 10,
+                compressed_page_size: 10,
+                first_row_index: 10,
+            },
+            PageLocation {
+                offset: 20,
+                compressed_page_size: 10,
+                first_row_index: 20,
+            },
+            PageLocation {
+                offset: 30,
+                compressed_page_size: 10,
+                first_row_index: 30,
+            },
+            PageLocation {
+                offset: 40,
+                compressed_page_size: 10,
+                first_row_index: 40,
+            },
+            PageLocation {
+                offset: 50,
+                compressed_page_size: 10,
+                first_row_index: 50,
+            },
+            PageLocation {
+                offset: 60,
+                compressed_page_size: 10,
+                first_row_index: 60,
+            },
+        ];
+
+        let selection = RowSelection::from(vec![
+            // Skip first page
+            RowSelector::skip(10),
+            // Multiple selects in same page
+            RowSelector::select(3),
+            RowSelector::skip(3),
+            RowSelector::select(4),
+            // Select to page boundary
+            RowSelector::skip(5),
+            RowSelector::select(5),
+            // Skip full page past page boundary
+            RowSelector::skip(12),
+            // Select across page boundaries
+            RowSelector::select(12),
+            // Skip final page
+            RowSelector::skip(12),
+        ]);
+
+        let ranges = selection.scan_ranges(&index);
+
+        // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
+        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
+
+        let selection = RowSelection::from(vec![
+            // Skip first page
+            RowSelector::skip(10),
+            // Multiple selects in same page
+            RowSelector::select(3),
+            RowSelector::skip(3),
+            RowSelector::select(4),
+            // Select to page boundary
+            RowSelector::skip(5),
+            RowSelector::select(5),
+            // Skip full page past page boundary
+            RowSelector::skip(12),
+            // Select across page boundaries
+            RowSelector::select(12),
+            RowSelector::skip(1),
+            // Select across page boundaries including final page
+            RowSelector::select(8),
+        ]);
+
+        let ranges = selection.scan_ranges(&index);
+
+        // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
+        assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
+    }
 }
diff --git a/parquet/src/arrow/async_reader.rs 
b/parquet/src/arrow/async_reader.rs
index 6c449bef4..090b9514d 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -78,17 +78,17 @@
 use std::collections::VecDeque;
 use std::fmt::Formatter;
 
-use std::io::{Cursor, SeekFrom};
+use std::io::SeekFrom;
 use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::{Context, Poll};
 
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
 use futures::future::{BoxFuture, FutureExt};
 use futures::ready;
 use futures::stream::Stream;
-use parquet_format::{PageHeader, PageType};
+
 use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
 
 use arrow::datatypes::SchemaRef;
@@ -100,14 +100,16 @@ use crate::arrow::arrow_reader::{
     RowFilter, RowSelection,
 };
 use crate::arrow::ProjectionMask;
-use crate::basic::Compression;
-use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
-use crate::compression::{create_codec, Codec};
+
+use crate::column::page::{PageIterator, PageReader};
+
 use crate::errors::{ParquetError, Result};
 use crate::file::footer::{decode_footer, decode_metadata};
 use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
-use crate::file::serialized_reader::{decode_page, read_page_header};
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+
 use crate::file::FOOTER_SIZE;
+
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 
 /// The asynchronous interface used by [`ParquetRecordBatchStream`] to read 
parquet files
@@ -286,7 +288,8 @@ where
 
         let meta = self.metadata.row_group(row_group_idx);
         let mut row_group = InMemoryRowGroup {
-            schema: meta.schema_descr_ptr(),
+            metadata: meta,
+            // schema: meta.schema_descr_ptr(),
             row_count: meta.num_rows() as usize,
             column_chunks: vec![None; meta.columns().len()],
         };
@@ -299,12 +302,7 @@ where
 
                 let predicate_projection = predicate.projection().clone();
                 row_group
-                    .fetch(
-                        &mut self.input,
-                        meta,
-                        &predicate_projection,
-                        selection.as_ref(),
-                    )
+                    .fetch(&mut self.input, &predicate_projection, 
selection.as_ref())
                     .await?;
 
                 let array_reader = build_array_reader(
@@ -327,7 +325,7 @@ where
         }
 
         row_group
-            .fetch(&mut self.input, meta, &projection, selection.as_ref())
+            .fetch(&mut self.input, &projection, selection.as_ref())
             .await?;
 
         let reader = ParquetRecordBatchReader::new(
@@ -471,62 +469,101 @@ where
 }
 
 /// An in-memory collection of column chunks
-struct InMemoryRowGroup {
-    schema: SchemaDescPtr,
-    column_chunks: Vec<Option<InMemoryColumnChunk>>,
+struct InMemoryRowGroup<'a> {
+    metadata: &'a RowGroupMetaData,
+    column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
     row_count: usize,
 }
 
-impl InMemoryRowGroup {
+impl<'a> InMemoryRowGroup<'a> {
     /// Fetches the necessary column data into memory
     async fn fetch<T: AsyncFileReader + Send>(
         &mut self,
         input: &mut T,
-        metadata: &RowGroupMetaData,
         projection: &ProjectionMask,
-        _selection: Option<&RowSelection>,
+        selection: Option<&RowSelection>,
     ) -> Result<()> {
-        // TODO: Use OffsetIndex and selection to prune pages
-
-        let fetch_ranges = self
-            .column_chunks
-            .iter()
-            .enumerate()
-            .into_iter()
-            .filter_map(|(idx, chunk)| {
-                (chunk.is_none() && projection.leaf_included(idx)).then(|| {
-                    let column = metadata.column(idx);
-                    let (start, length) = column.byte_range();
-                    start as usize..(start + length) as usize
+        if let Some((selection, page_locations)) =
+            selection.zip(self.metadata.page_offset_index().as_ref())
+        {
+            // If we have a `RowSelection` and an `OffsetIndex` then only 
fetch pages required for the
+            // `RowSelection`
+            let mut page_start_offsets: Vec<Vec<usize>> = vec![];
+
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let ranges = 
selection.scan_ranges(&page_locations[idx]);
+                        page_start_offsets
+                            .push(ranges.iter().map(|range| 
range.start).collect());
+                        ranges
+                    })
                 })
-            })
-            .collect();
+                .flatten()
+                .collect();
 
-        let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+            let mut page_start_offsets = page_start_offsets.into_iter();
 
-        for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
-            if chunk.is_some() || !projection.leaf_included(idx) {
-                continue;
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(offsets) = page_start_offsets.next() {
+                    let mut chunks = Vec::with_capacity(offsets.len());
+                    for _ in 0..offsets.len() {
+                        chunks.push(chunk_data.next().unwrap());
+                    }
+
+                    *chunk = Some(Arc::new(ColumnChunkData::Sparse {
+                        length: self.metadata.column(idx).byte_range().1 as 
usize,
+                        data: 
offsets.into_iter().zip(chunks.into_iter()).collect(),
+                    }))
+                }
             }
+        } else {
+            let fetch_ranges = self
+                .column_chunks
+                .iter()
+                .enumerate()
+                .into_iter()
+                .filter_map(|(idx, chunk)| {
+                    (chunk.is_none() && projection.leaf_included(idx)).then(|| 
{
+                        let column = self.metadata.column(idx);
+                        let (start, length) = column.byte_range();
+                        start as usize..(start + length) as usize
+                    })
+                })
+                .collect();
 
-            let column = metadata.column(idx);
+            let mut chunk_data = 
input.get_byte_ranges(fetch_ranges).await?.into_iter();
 
-            if let Some(data) = chunk_data.next() {
-                *chunk = Some(InMemoryColumnChunk {
-                    num_values: column.num_values(),
-                    compression: column.compression(),
-                    physical_type: column.column_type(),
-                    data,
-                });
+            for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+                if chunk.is_some() || !projection.leaf_included(idx) {
+                    continue;
+                }
+
+                if let Some(data) = chunk_data.next() {
+                    *chunk = Some(Arc::new(ColumnChunkData::Dense {
+                        offset: self.metadata.column(idx).byte_range().0 as 
usize,
+                        data,
+                    }));
+                }
             }
         }
+
         Ok(())
     }
 }
 
-impl RowGroupCollection for InMemoryRowGroup {
+impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
     fn schema(&self) -> SchemaDescPtr {
-        self.schema.clone()
+        self.metadata.schema_descr_ptr()
     }
 
     fn num_rows(&self) -> usize {
@@ -534,158 +571,79 @@ impl RowGroupCollection for InMemoryRowGroup {
     }
 
     fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
-        let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
-        Ok(Box::new(ColumnChunkIterator {
-            schema: self.schema.clone(),
-            column_schema: self.schema.columns()[i].clone(),
-            reader: Some(page_reader),
-        }))
+        match &self.column_chunks[i] {
+            None => Err(ParquetError::General(format!(
+                "Invalid column index {}, column was not fetched",
+                i
+            ))),
+            Some(data) => {
+                let page_locations = self
+                    .metadata
+                    .page_offset_index()
+                    .as_ref()
+                    .map(|index| index[i].clone());
+                let page_reader: Box<dyn PageReader> =
+                    Box::new(SerializedPageReader::new(
+                        data.clone(),
+                        self.metadata.column(i),
+                        self.row_count,
+                        page_locations,
+                    )?);
+
+                Ok(Box::new(ColumnChunkIterator {
+                    schema: self.metadata.schema_descr_ptr(),
+                    column_schema: 
self.metadata.schema_descr_ptr().columns()[i].clone(),
+                    reader: Some(Ok(page_reader)),
+                }))
+            }
+        }
     }
 }
 
-/// Data for a single column chunk
+/// An in-memory column chunk
 #[derive(Clone)]
-struct InMemoryColumnChunk {
-    num_values: i64,
-    compression: Compression,
-    physical_type: crate::basic::Type,
-    data: Bytes,
-}
-
-impl InMemoryColumnChunk {
-    fn pages(&self) -> Result<Box<dyn PageReader>> {
-        let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
-        Ok(Box::new(page_reader))
-    }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
-    chunk: InMemoryColumnChunk,
-    decompressor: Option<Box<dyn Codec>>,
-    offset: usize,
-    seen_num_values: i64,
-    // If the next page header has already been "peeked", we will cache it here
-    next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
-    /// Creates a new serialized page reader from file source.
-    fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
-        let decompressor = create_codec(chunk.compression)?;
-        let result = Self {
-            chunk,
-            decompressor,
-            offset: 0,
-            seen_num_values: 0,
-            next_page_header: None,
-        };
-        Ok(result)
-    }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
-    type Item = Result<Page>;
-
-    fn next(&mut self) -> Option<Self::Item> {
-        self.get_next_page().transpose()
-    }
+enum ColumnChunkData {
+    /// Column chunk data representing only a subset of data pages
+    Sparse {
+        /// Length of the full column chunk
+        length: usize,
+        /// Set of data pages included in this sparse chunk. Each element is a 
tuple
+        /// of (page offset, page data)
+        data: Vec<(usize, Bytes)>,
+    },
+    /// Full column chunk and its offset
+    Dense { offset: usize, data: Bytes },
 }
 
-impl PageReader for InMemoryColumnChunkReader {
-    fn get_next_page(&mut self) -> Result<Option<Page>> {
-        while self.seen_num_values < self.chunk.num_values {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = if let Some(page_header) = 
self.next_page_header.take() {
-                // The next page header has already been peeked, so use the 
cached value
-                page_header
-            } else {
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-                page_header
-            };
-
-            let compressed_size = page_header.compressed_page_size as usize;
-
-            let start_offset = self.offset;
-            let end_offset = self.offset + compressed_size;
-            self.offset = end_offset;
-
-            let buffer = self.chunk.data.slice(start_offset..end_offset);
-
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer.into(),
-                        self.chunk.physical_type,
-                        self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    decoded
-                }
-                PageType::DictionaryPage => decode_page(
-                    page_header,
-                    buffer.into(),
-                    self.chunk.physical_type,
-                    self.decompressor.as_mut(),
-                )?,
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
-            };
-
-            return Ok(Some(result));
+impl Length for ColumnChunkData {
+    fn len(&self) -> u64 {
+        match &self {
+            ColumnChunkData::Sparse { length, .. } => *length as u64,
+            ColumnChunkData::Dense { data, .. } => data.len() as u64,
         }
-
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
-    }
-
-    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        while self.seen_num_values < self.chunk.num_values {
-            return if let Some(buffered_header) = 
self.next_page_header.as_ref() {
-                if let Ok(page_metadata) = buffered_header.try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    self.next_page_header = None;
-                    continue;
-                }
-            } else {
-                let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-                let page_header = read_page_header(&mut cursor)?;
-                self.offset += cursor.position() as usize;
-
-                let page_metadata = if let Ok(page_metadata) = 
(&page_header).try_into() {
-                    Ok(Some(page_metadata))
-                } else {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                };
-
-                self.next_page_header = Some(page_header);
-                page_metadata
-            };
-        }
-
-        Ok(None)
     }
+}
 
-    fn skip_next_page(&mut self) -> Result<()> {
-        if let Some(buffered_header) = self.next_page_header.take() {
-            // The next page header has already been peeked, so just advance 
the offset
-            self.offset += buffered_header.compressed_page_size as usize;
-        } else {
-            let mut cursor = 
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
-            let page_header = read_page_header(&mut cursor)?;
-            self.offset += cursor.position() as usize;
-            self.offset += page_header.compressed_page_size as usize;
+impl ChunkReader for ColumnChunkData {
+    type T = bytes::buf::Reader<Bytes>;
+
+    fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+        match &self {
+            ColumnChunkData::Sparse { data, .. } => data
+                .binary_search_by_key(&start, |(offset, _)| *offset as u64)
+                .map(|idx| data[idx].1.slice(0..length).reader())
+                .map_err(|_| {
+                    ParquetError::General(format!(
+                        "Invalid offset in sparse column chunk data: {}",
+                        start
+                    ))
+                }),
+            ColumnChunkData::Dense { offset, data } => {
+                let start = start as usize - *offset;
+                let end = start + length;
+                Ok(data.slice(start..end).reader())
+            }
         }
-
-        Ok(())
     }
 }
 
@@ -717,11 +675,15 @@ impl PageIterator for ColumnChunkIterator {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::arrow::arrow_reader::{ArrowPredicateFn, 
ParquetRecordBatchReaderBuilder};
-    use crate::arrow::ArrowWriter;
+    use crate::arrow::arrow_reader::{
+        ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
+    };
+    use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
     use crate::file::footer::parse_metadata;
+    use crate::file::page_index::index_reader;
     use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
     use arrow::error::Result as ArrowResult;
+
     use futures::TryStreamExt;
     use std::sync::Mutex;
 
@@ -797,105 +759,6 @@ mod tests {
         );
     }
 
-    #[tokio::test]
-    async fn test_in_memory_column_chunk_reader() {
-        let testdata = arrow::util::test_util::parquet_test_data();
-        let path = format!("{}/alltypes_plain.parquet", testdata);
-        let data = Bytes::from(std::fs::read(path).unwrap());
-
-        let metadata = crate::file::footer::parse_metadata(&data).unwrap();
-
-        let column_metadata = metadata.row_group(0).column(0);
-
-        let (start, length) = column_metadata.byte_range();
-
-        let column_data = data.slice(start as usize..(start + length) as 
usize);
-
-        let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
-            num_values: column_metadata.num_values(),
-            compression: column_metadata.compression(),
-            physical_type: column_metadata.column_type(),
-            data: column_data,
-        })
-        .expect("building reader");
-
-        let first_page = reader
-            .peek_next_page()
-            .expect("peeking first page")
-            .expect("first page is empty");
-
-        assert!(first_page.is_dict);
-        assert_eq!(first_page.num_rows, 0);
-
-        let first_page = reader
-            .get_next_page()
-            .expect("getting first page")
-            .expect("first page is empty");
-
-        assert_eq!(
-            first_page.page_type(),
-            crate::basic::PageType::DICTIONARY_PAGE
-        );
-        assert_eq!(first_page.num_values(), 8);
-
-        let second_page = reader
-            .peek_next_page()
-            .expect("peeking second page")
-            .expect("second page is empty");
-
-        assert!(!second_page.is_dict);
-        assert_eq!(second_page.num_rows, 8);
-
-        let second_page = reader
-            .get_next_page()
-            .expect("getting second page")
-            .expect("second page is empty");
-
-        assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
-        assert_eq!(second_page.num_values(), 8);
-
-        let third_page = reader.peek_next_page().expect("getting third page");
-
-        assert!(third_page.is_none());
-
-        let third_page = reader.get_next_page().expect("getting third page");
-
-        assert!(third_page.is_none());
-    }
-
-    #[tokio::test]
-    async fn test_in_memory_column_chunk_reader_skip_page() {
-        let testdata = arrow::util::test_util::parquet_test_data();
-        let path = format!("{}/alltypes_plain.parquet", testdata);
-        let data = Bytes::from(std::fs::read(path).unwrap());
-
-        let metadata = crate::file::footer::parse_metadata(&data).unwrap();
-
-        let column_metadata = metadata.row_group(0).column(0);
-
-        let (start, length) = column_metadata.byte_range();
-
-        let column_data = data.slice(start as usize..(start + length) as 
usize);
-
-        let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
-            num_values: column_metadata.num_values(),
-            compression: column_metadata.compression(),
-            physical_type: column_metadata.column_type(),
-            data: column_data,
-        })
-        .expect("building reader");
-
-        reader.skip_next_page().expect("skipping first page");
-
-        let second_page = reader
-            .get_next_page()
-            .expect("getting second page")
-            .expect("second page is empty");
-
-        assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
-        assert_eq!(second_page.num_values(), 8);
-    }
-
     #[tokio::test]
     async fn test_row_filter() {
         let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
@@ -964,4 +827,87 @@ mod tests {
         // Should only have made 3 requests
         assert_eq!(requests.lock().unwrap().len(), 3);
     }
+
+    #[tokio::test]
+    async fn test_in_memory_row_group_sparse() {
+        let testdata = arrow::util::test_util::parquet_test_data();
+        let path = format!("{}/alltypes_tiny_pages.parquet", testdata);
+        let data = Bytes::from(std::fs::read(path).unwrap());
+
+        let metadata = parse_metadata(&data).unwrap();
+
+        let offset_index =
+            index_reader::read_pages_locations(&data, 
metadata.row_group(0).columns())
+                .expect("reading offset index");
+
+        let mut row_group_meta = metadata.row_group(0).clone();
+        row_group_meta.set_page_offset(offset_index.clone());
+        let metadata =
+            ParquetMetaData::new(metadata.file_metadata().clone(), 
vec![row_group_meta]);
+
+        let metadata = Arc::new(metadata);
+
+        let num_rows = metadata.row_group(0).num_rows();
+
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let async_reader = TestReader {
+            data: data.clone(),
+            metadata: metadata.clone(),
+            requests: Default::default(),
+        };
+
+        let requests = async_reader.requests.clone();
+        let schema = Arc::new(
+            parquet_to_arrow_schema(metadata.file_metadata().schema_descr(), 
None)
+                .expect("building arrow schema"),
+        );
+
+        let _schema_desc = metadata.file_metadata().schema_descr();
+
+        let projection =
+            ProjectionMask::leaves(metadata.file_metadata().schema_descr(), 
vec![0]);
+
+        let reader_factory = ReaderFactory {
+            metadata,
+            schema,
+            input: async_reader,
+            filter: None,
+        };
+
+        let mut skip = true;
+        let mut pages = offset_index[0].iter().peekable();
+
+        // Setup `RowSelection` so that we can skip every other page
+        let mut selectors = vec![];
+        let mut expected_page_requests: Vec<Range<usize>> = vec![];
+        while let Some(page) = pages.next() {
+            let num_rows = if let Some(next_page) = pages.peek() {
+                next_page.first_row_index - page.first_row_index
+            } else {
+                num_rows - page.first_row_index
+            };
+
+            if skip {
+                selectors.push(RowSelector::skip(num_rows as usize));
+            } else {
+                selectors.push(RowSelector::select(num_rows as usize));
+                let start = page.offset as usize;
+                let end = start + page.compressed_page_size as usize;
+                expected_page_requests.push(start..end);
+            }
+            skip = !skip;
+        }
+
+        let selection = RowSelection::from(selectors);
+
+        let (_factory, _reader) = reader_factory
+            .read_row_group(0, Some(selection), projection, 48)
+            .await
+            .expect("reading row group");
+
+        let requests = requests.lock().unwrap();
+
+        assert_eq!(&requests[..], &expected_page_requests)
+    }
 }
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 1658797ce..ab2d885a2 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -195,6 +195,7 @@ impl PageWriteSpec {
 }
 
 /// Contains metadata for a page
+#[derive(Clone)]
 pub struct PageMetadata {
     /// The number of rows in this page
     pub num_rows: usize,
diff --git a/parquet/src/file/page_index/index_reader.rs 
b/parquet/src/file/page_index/index_reader.rs
index 33499e742..e3f37fbc6 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -65,7 +65,7 @@ pub fn read_pages_locations<R: ChunkReader>(
     let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
 
     //read all need data into buffer
-    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut reader = reader.get_read(offset, total_length)?;
     let mut data = vec![0; total_length];
     reader.read_exact(&mut data)?;
 

Reply via email to