This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 2185ce220 Use OffsetIndex to prune IO with RowSelection (#2473)
2185ce220 is described below
commit 2185ce22043953b3185001a04b46f50fbd6956d7
Author: Dan Harris <[email protected]>
AuthorDate: Wed Aug 17 09:38:33 2022 -0400
Use OffsetIndex to prune IO with RowSelection (#2473)
* Add struct for in-memory row group with only selected pages
* Read only pages required for row selection
* Remove InMemoryColumnChumk and prune IO for row selection
* Review comments
* Unignore test
* Avoid copies
* Fix docs
* Linting
---
parquet/src/arrow/arrow_reader/selection.rs | 147 +++++++-
parquet/src/arrow/async_reader.rs | 530 +++++++++++++---------------
parquet/src/column/page.rs | 1 +
parquet/src/file/page_index/index_reader.rs | 2 +-
4 files changed, 384 insertions(+), 296 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index 8e129f566..72867e891 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -17,12 +17,14 @@
use arrow::array::{Array, BooleanArray};
use arrow::compute::SlicesIterator;
+use parquet_format::PageLocation;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::ops::Range;
-/// [`RowSelector`] represents a range of rows to scan from a parquet file
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+/// [`RowSelection`] is a collection of [`RowSelector`] used to skip rows when
+/// scanning a parquet file
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct RowSelector {
/// The number of rows
pub row_count: usize,
@@ -116,6 +118,57 @@ impl RowSelection {
Self { selectors }
}
+ /// Given an offset index, return the offset ranges for all data pages
selected by `self`
+ pub(crate) fn scan_ranges(
+ &self,
+ page_locations: &[PageLocation],
+ ) -> Vec<Range<usize>> {
+ let mut ranges = vec![];
+ let mut row_offset = 0;
+
+ let mut pages = page_locations.iter().peekable();
+ let mut selectors = self.selectors.iter().cloned();
+ let mut current_selector = selectors.next();
+ let mut current_page = pages.next();
+
+ let mut current_page_included = false;
+
+ while let Some((selector, page)) =
current_selector.as_mut().zip(current_page) {
+ if !(selector.skip || current_page_included) {
+ let start = page.offset as usize;
+ let end = start + page.compressed_page_size as usize;
+ ranges.push(start..end);
+ current_page_included = true;
+ }
+
+ if let Some(next_page) = pages.peek() {
+ if row_offset + selector.row_count > next_page.first_row_index
as usize {
+ let remaining_in_page =
+ next_page.first_row_index as usize - row_offset;
+ selector.row_count -= remaining_in_page;
+ row_offset += remaining_in_page;
+ current_page = pages.next();
+ current_page_included = false;
+
+ continue;
+ } else {
+ if row_offset + selector.row_count
+ == next_page.first_row_index as usize
+ {
+ current_page = pages.next();
+ current_page_included = false;
+ }
+ row_offset += selector.row_count;
+ current_selector = selectors.next();
+ }
+ } else {
+ break;
+ }
+ }
+
+ ranges
+ }
+
/// Splits off the first `row_count` from this [`RowSelection`]
pub fn split_off(&mut self, row_count: usize) -> Self {
let mut total_count = 0;
@@ -162,7 +215,7 @@ impl RowSelection {
/// self: NNNNNNNNNNNNYYYYYYYYYYYYYYYYYYYYYYNNNYYYYY
/// other: YYYYYNNNNYYYYYYYYYYYYY YYNNN
///
- /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYYNNYNNNN
+ /// returned: NNNNNNNNNNNNYYYYYNNNNYYYYYYYYYYYYYNNNYYNNN
///
///
pub fn and_then(&self, other: &Self) -> Self {
@@ -423,4 +476,92 @@ mod tests {
assert_eq!(a.and_then(&b), expected);
}
}
+
+ #[test]
+ fn test_scan_ranges() {
+ let index = vec![
+ PageLocation {
+ offset: 0,
+ compressed_page_size: 10,
+ first_row_index: 0,
+ },
+ PageLocation {
+ offset: 10,
+ compressed_page_size: 10,
+ first_row_index: 10,
+ },
+ PageLocation {
+ offset: 20,
+ compressed_page_size: 10,
+ first_row_index: 20,
+ },
+ PageLocation {
+ offset: 30,
+ compressed_page_size: 10,
+ first_row_index: 30,
+ },
+ PageLocation {
+ offset: 40,
+ compressed_page_size: 10,
+ first_row_index: 40,
+ },
+ PageLocation {
+ offset: 50,
+ compressed_page_size: 10,
+ first_row_index: 50,
+ },
+ PageLocation {
+ offset: 60,
+ compressed_page_size: 10,
+ first_row_index: 60,
+ },
+ ];
+
+ let selection = RowSelection::from(vec![
+ // Skip first page
+ RowSelector::skip(10),
+ // Multiple selects in same page
+ RowSelector::select(3),
+ RowSelector::skip(3),
+ RowSelector::select(4),
+ // Select to page boundary
+ RowSelector::skip(5),
+ RowSelector::select(5),
+ // Skip full page past page boundary
+ RowSelector::skip(12),
+ // Select across page boundaries
+ RowSelector::select(12),
+ // Skip final page
+ RowSelector::skip(12),
+ ]);
+
+ let ranges = selection.scan_ranges(&index);
+
+ // assert_eq!(mask, vec![false, true, true, false, true, true, false]);
+ assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60]);
+
+ let selection = RowSelection::from(vec![
+ // Skip first page
+ RowSelector::skip(10),
+ // Multiple selects in same page
+ RowSelector::select(3),
+ RowSelector::skip(3),
+ RowSelector::select(4),
+ // Select to page boundary
+ RowSelector::skip(5),
+ RowSelector::select(5),
+ // Skip full page past page boundary
+ RowSelector::skip(12),
+ // Select across page boundaries
+ RowSelector::select(12),
+ RowSelector::skip(1),
+ // Select across page boundaries including final page
+ RowSelector::select(8),
+ ]);
+
+ let ranges = selection.scan_ranges(&index);
+
+ // assert_eq!(mask, vec![false, true, true, false, true, true, true]);
+ assert_eq!(ranges, vec![10..20, 20..30, 40..50, 50..60, 60..70]);
+ }
}
diff --git a/parquet/src/arrow/async_reader.rs
b/parquet/src/arrow/async_reader.rs
index 6c449bef4..090b9514d 100644
--- a/parquet/src/arrow/async_reader.rs
+++ b/parquet/src/arrow/async_reader.rs
@@ -78,17 +78,17 @@
use std::collections::VecDeque;
use std::fmt::Formatter;
-use std::io::{Cursor, SeekFrom};
+use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
-use bytes::Bytes;
+use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
-use parquet_format::{PageHeader, PageType};
+
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use arrow::datatypes::SchemaRef;
@@ -100,14 +100,16 @@ use crate::arrow::arrow_reader::{
RowFilter, RowSelection,
};
use crate::arrow::ProjectionMask;
-use crate::basic::Compression;
-use crate::column::page::{Page, PageIterator, PageMetadata, PageReader};
-use crate::compression::{create_codec, Codec};
+
+use crate::column::page::{PageIterator, PageReader};
+
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
-use crate::file::serialized_reader::{decode_page, read_page_header};
+use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
+
use crate::file::FOOTER_SIZE;
+
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
/// The asynchronous interface used by [`ParquetRecordBatchStream`] to read
parquet files
@@ -286,7 +288,8 @@ where
let meta = self.metadata.row_group(row_group_idx);
let mut row_group = InMemoryRowGroup {
- schema: meta.schema_descr_ptr(),
+ metadata: meta,
+ // schema: meta.schema_descr_ptr(),
row_count: meta.num_rows() as usize,
column_chunks: vec![None; meta.columns().len()],
};
@@ -299,12 +302,7 @@ where
let predicate_projection = predicate.projection().clone();
row_group
- .fetch(
- &mut self.input,
- meta,
- &predicate_projection,
- selection.as_ref(),
- )
+ .fetch(&mut self.input, &predicate_projection,
selection.as_ref())
.await?;
let array_reader = build_array_reader(
@@ -327,7 +325,7 @@ where
}
row_group
- .fetch(&mut self.input, meta, &projection, selection.as_ref())
+ .fetch(&mut self.input, &projection, selection.as_ref())
.await?;
let reader = ParquetRecordBatchReader::new(
@@ -471,62 +469,101 @@ where
}
/// An in-memory collection of column chunks
-struct InMemoryRowGroup {
- schema: SchemaDescPtr,
- column_chunks: Vec<Option<InMemoryColumnChunk>>,
+struct InMemoryRowGroup<'a> {
+ metadata: &'a RowGroupMetaData,
+ column_chunks: Vec<Option<Arc<ColumnChunkData>>>,
row_count: usize,
}
-impl InMemoryRowGroup {
+impl<'a> InMemoryRowGroup<'a> {
/// Fetches the necessary column data into memory
async fn fetch<T: AsyncFileReader + Send>(
&mut self,
input: &mut T,
- metadata: &RowGroupMetaData,
projection: &ProjectionMask,
- _selection: Option<&RowSelection>,
+ selection: Option<&RowSelection>,
) -> Result<()> {
- // TODO: Use OffsetIndex and selection to prune pages
-
- let fetch_ranges = self
- .column_chunks
- .iter()
- .enumerate()
- .into_iter()
- .filter_map(|(idx, chunk)| {
- (chunk.is_none() && projection.leaf_included(idx)).then(|| {
- let column = metadata.column(idx);
- let (start, length) = column.byte_range();
- start as usize..(start + length) as usize
+ if let Some((selection, page_locations)) =
+ selection.zip(self.metadata.page_offset_index().as_ref())
+ {
+ // If we have a `RowSelection` and an `OffsetIndex` then only
fetch pages required for the
+ // `RowSelection`
+ let mut page_start_offsets: Vec<Vec<usize>> = vec![];
+
+ let fetch_ranges = self
+ .column_chunks
+ .iter()
+ .enumerate()
+ .into_iter()
+ .filter_map(|(idx, chunk)| {
+ (chunk.is_none() && projection.leaf_included(idx)).then(||
{
+ let ranges =
selection.scan_ranges(&page_locations[idx]);
+ page_start_offsets
+ .push(ranges.iter().map(|range|
range.start).collect());
+ ranges
+ })
})
- })
- .collect();
+ .flatten()
+ .collect();
- let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+ let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();
+ let mut page_start_offsets = page_start_offsets.into_iter();
- for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
- if chunk.is_some() || !projection.leaf_included(idx) {
- continue;
+ for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+ if chunk.is_some() || !projection.leaf_included(idx) {
+ continue;
+ }
+
+ if let Some(offsets) = page_start_offsets.next() {
+ let mut chunks = Vec::with_capacity(offsets.len());
+ for _ in 0..offsets.len() {
+ chunks.push(chunk_data.next().unwrap());
+ }
+
+ *chunk = Some(Arc::new(ColumnChunkData::Sparse {
+ length: self.metadata.column(idx).byte_range().1 as
usize,
+ data:
offsets.into_iter().zip(chunks.into_iter()).collect(),
+ }))
+ }
}
+ } else {
+ let fetch_ranges = self
+ .column_chunks
+ .iter()
+ .enumerate()
+ .into_iter()
+ .filter_map(|(idx, chunk)| {
+ (chunk.is_none() && projection.leaf_included(idx)).then(||
{
+ let column = self.metadata.column(idx);
+ let (start, length) = column.byte_range();
+ start as usize..(start + length) as usize
+ })
+ })
+ .collect();
- let column = metadata.column(idx);
+ let mut chunk_data =
input.get_byte_ranges(fetch_ranges).await?.into_iter();
- if let Some(data) = chunk_data.next() {
- *chunk = Some(InMemoryColumnChunk {
- num_values: column.num_values(),
- compression: column.compression(),
- physical_type: column.column_type(),
- data,
- });
+ for (idx, chunk) in self.column_chunks.iter_mut().enumerate() {
+ if chunk.is_some() || !projection.leaf_included(idx) {
+ continue;
+ }
+
+ if let Some(data) = chunk_data.next() {
+ *chunk = Some(Arc::new(ColumnChunkData::Dense {
+ offset: self.metadata.column(idx).byte_range().0 as
usize,
+ data,
+ }));
+ }
}
}
+
Ok(())
}
}
-impl RowGroupCollection for InMemoryRowGroup {
+impl<'a> RowGroupCollection for InMemoryRowGroup<'a> {
fn schema(&self) -> SchemaDescPtr {
- self.schema.clone()
+ self.metadata.schema_descr_ptr()
}
fn num_rows(&self) -> usize {
@@ -534,158 +571,79 @@ impl RowGroupCollection for InMemoryRowGroup {
}
fn column_chunks(&self, i: usize) -> Result<Box<dyn PageIterator>> {
- let page_reader = self.column_chunks[i].as_ref().unwrap().pages();
-
- Ok(Box::new(ColumnChunkIterator {
- schema: self.schema.clone(),
- column_schema: self.schema.columns()[i].clone(),
- reader: Some(page_reader),
- }))
+ match &self.column_chunks[i] {
+ None => Err(ParquetError::General(format!(
+ "Invalid column index {}, column was not fetched",
+ i
+ ))),
+ Some(data) => {
+ let page_locations = self
+ .metadata
+ .page_offset_index()
+ .as_ref()
+ .map(|index| index[i].clone());
+ let page_reader: Box<dyn PageReader> =
+ Box::new(SerializedPageReader::new(
+ data.clone(),
+ self.metadata.column(i),
+ self.row_count,
+ page_locations,
+ )?);
+
+ Ok(Box::new(ColumnChunkIterator {
+ schema: self.metadata.schema_descr_ptr(),
+ column_schema:
self.metadata.schema_descr_ptr().columns()[i].clone(),
+ reader: Some(Ok(page_reader)),
+ }))
+ }
+ }
}
}
-/// Data for a single column chunk
+/// An in-memory column chunk
#[derive(Clone)]
-struct InMemoryColumnChunk {
- num_values: i64,
- compression: Compression,
- physical_type: crate::basic::Type,
- data: Bytes,
-}
-
-impl InMemoryColumnChunk {
- fn pages(&self) -> Result<Box<dyn PageReader>> {
- let page_reader = InMemoryColumnChunkReader::new(self.clone())?;
- Ok(Box::new(page_reader))
- }
-}
-
-// A serialized implementation for Parquet [`PageReader`].
-struct InMemoryColumnChunkReader {
- chunk: InMemoryColumnChunk,
- decompressor: Option<Box<dyn Codec>>,
- offset: usize,
- seen_num_values: i64,
- // If the next page header has already been "peeked", we will cache it here
- next_page_header: Option<PageHeader>,
-}
-
-impl InMemoryColumnChunkReader {
- /// Creates a new serialized page reader from file source.
- fn new(chunk: InMemoryColumnChunk) -> Result<Self> {
- let decompressor = create_codec(chunk.compression)?;
- let result = Self {
- chunk,
- decompressor,
- offset: 0,
- seen_num_values: 0,
- next_page_header: None,
- };
- Ok(result)
- }
-}
-
-impl Iterator for InMemoryColumnChunkReader {
- type Item = Result<Page>;
-
- fn next(&mut self) -> Option<Self::Item> {
- self.get_next_page().transpose()
- }
+enum ColumnChunkData {
+ /// Column chunk data representing only a subset of data pages
+ Sparse {
+ /// Length of the full column chunk
+ length: usize,
+ /// Set of data pages included in this sparse chunk. Each element is a
tuple
+ /// of (page offset, page data)
+ data: Vec<(usize, Bytes)>,
+ },
+ /// Full column chunk and its offset
+ Dense { offset: usize, data: Bytes },
}
-impl PageReader for InMemoryColumnChunkReader {
- fn get_next_page(&mut self) -> Result<Option<Page>> {
- while self.seen_num_values < self.chunk.num_values {
- let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
- let page_header = if let Some(page_header) =
self.next_page_header.take() {
- // The next page header has already been peeked, so use the
cached value
- page_header
- } else {
- let page_header = read_page_header(&mut cursor)?;
- self.offset += cursor.position() as usize;
- page_header
- };
-
- let compressed_size = page_header.compressed_page_size as usize;
-
- let start_offset = self.offset;
- let end_offset = self.offset + compressed_size;
- self.offset = end_offset;
-
- let buffer = self.chunk.data.slice(start_offset..end_offset);
-
- let result = match page_header.type_ {
- PageType::DataPage | PageType::DataPageV2 => {
- let decoded = decode_page(
- page_header,
- buffer.into(),
- self.chunk.physical_type,
- self.decompressor.as_mut(),
- )?;
- self.seen_num_values += decoded.num_values() as i64;
- decoded
- }
- PageType::DictionaryPage => decode_page(
- page_header,
- buffer.into(),
- self.chunk.physical_type,
- self.decompressor.as_mut(),
- )?,
- _ => {
- // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
- continue;
- }
- };
-
- return Ok(Some(result));
+impl Length for ColumnChunkData {
+ fn len(&self) -> u64 {
+ match &self {
+ ColumnChunkData::Sparse { length, .. } => *length as u64,
+ ColumnChunkData::Dense { data, .. } => data.len() as u64,
}
-
- // We are at the end of this column chunk and no more page left.
Return None.
- Ok(None)
- }
-
- fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
- while self.seen_num_values < self.chunk.num_values {
- return if let Some(buffered_header) =
self.next_page_header.as_ref() {
- if let Ok(page_metadata) = buffered_header.try_into() {
- Ok(Some(page_metadata))
- } else {
- // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
- self.next_page_header = None;
- continue;
- }
- } else {
- let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
- let page_header = read_page_header(&mut cursor)?;
- self.offset += cursor.position() as usize;
-
- let page_metadata = if let Ok(page_metadata) =
(&page_header).try_into() {
- Ok(Some(page_metadata))
- } else {
- // For unknown page type (e.g., INDEX_PAGE), skip and read
next.
- continue;
- };
-
- self.next_page_header = Some(page_header);
- page_metadata
- };
- }
-
- Ok(None)
}
+}
- fn skip_next_page(&mut self) -> Result<()> {
- if let Some(buffered_header) = self.next_page_header.take() {
- // The next page header has already been peeked, so just advance
the offset
- self.offset += buffered_header.compressed_page_size as usize;
- } else {
- let mut cursor =
Cursor::new(&self.chunk.data.as_ref()[self.offset..]);
- let page_header = read_page_header(&mut cursor)?;
- self.offset += cursor.position() as usize;
- self.offset += page_header.compressed_page_size as usize;
+impl ChunkReader for ColumnChunkData {
+ type T = bytes::buf::Reader<Bytes>;
+
+ fn get_read(&self, start: u64, length: usize) -> Result<Self::T> {
+ match &self {
+ ColumnChunkData::Sparse { data, .. } => data
+ .binary_search_by_key(&start, |(offset, _)| *offset as u64)
+ .map(|idx| data[idx].1.slice(0..length).reader())
+ .map_err(|_| {
+ ParquetError::General(format!(
+ "Invalid offset in sparse column chunk data: {}",
+ start
+ ))
+ }),
+ ColumnChunkData::Dense { offset, data } => {
+ let start = start as usize - *offset;
+ let end = start + length;
+ Ok(data.slice(start..end).reader())
+ }
}
-
- Ok(())
}
}
@@ -717,11 +675,15 @@ impl PageIterator for ColumnChunkIterator {
#[cfg(test)]
mod tests {
use super::*;
- use crate::arrow::arrow_reader::{ArrowPredicateFn,
ParquetRecordBatchReaderBuilder};
- use crate::arrow::ArrowWriter;
+ use crate::arrow::arrow_reader::{
+ ArrowPredicateFn, ParquetRecordBatchReaderBuilder, RowSelector,
+ };
+ use crate::arrow::{parquet_to_arrow_schema, ArrowWriter};
use crate::file::footer::parse_metadata;
+ use crate::file::page_index::index_reader;
use arrow::array::{Array, ArrayRef, Int32Array, StringArray};
use arrow::error::Result as ArrowResult;
+
use futures::TryStreamExt;
use std::sync::Mutex;
@@ -797,105 +759,6 @@ mod tests {
);
}
- #[tokio::test]
- async fn test_in_memory_column_chunk_reader() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path = format!("{}/alltypes_plain.parquet", testdata);
- let data = Bytes::from(std::fs::read(path).unwrap());
-
- let metadata = crate::file::footer::parse_metadata(&data).unwrap();
-
- let column_metadata = metadata.row_group(0).column(0);
-
- let (start, length) = column_metadata.byte_range();
-
- let column_data = data.slice(start as usize..(start + length) as
usize);
-
- let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
- num_values: column_metadata.num_values(),
- compression: column_metadata.compression(),
- physical_type: column_metadata.column_type(),
- data: column_data,
- })
- .expect("building reader");
-
- let first_page = reader
- .peek_next_page()
- .expect("peeking first page")
- .expect("first page is empty");
-
- assert!(first_page.is_dict);
- assert_eq!(first_page.num_rows, 0);
-
- let first_page = reader
- .get_next_page()
- .expect("getting first page")
- .expect("first page is empty");
-
- assert_eq!(
- first_page.page_type(),
- crate::basic::PageType::DICTIONARY_PAGE
- );
- assert_eq!(first_page.num_values(), 8);
-
- let second_page = reader
- .peek_next_page()
- .expect("peeking second page")
- .expect("second page is empty");
-
- assert!(!second_page.is_dict);
- assert_eq!(second_page.num_rows, 8);
-
- let second_page = reader
- .get_next_page()
- .expect("getting second page")
- .expect("second page is empty");
-
- assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
- assert_eq!(second_page.num_values(), 8);
-
- let third_page = reader.peek_next_page().expect("getting third page");
-
- assert!(third_page.is_none());
-
- let third_page = reader.get_next_page().expect("getting third page");
-
- assert!(third_page.is_none());
- }
-
- #[tokio::test]
- async fn test_in_memory_column_chunk_reader_skip_page() {
- let testdata = arrow::util::test_util::parquet_test_data();
- let path = format!("{}/alltypes_plain.parquet", testdata);
- let data = Bytes::from(std::fs::read(path).unwrap());
-
- let metadata = crate::file::footer::parse_metadata(&data).unwrap();
-
- let column_metadata = metadata.row_group(0).column(0);
-
- let (start, length) = column_metadata.byte_range();
-
- let column_data = data.slice(start as usize..(start + length) as
usize);
-
- let mut reader = InMemoryColumnChunkReader::new(InMemoryColumnChunk {
- num_values: column_metadata.num_values(),
- compression: column_metadata.compression(),
- physical_type: column_metadata.column_type(),
- data: column_data,
- })
- .expect("building reader");
-
- reader.skip_next_page().expect("skipping first page");
-
- let second_page = reader
- .get_next_page()
- .expect("getting second page")
- .expect("second page is empty");
-
- assert_eq!(second_page.page_type(), crate::basic::PageType::DATA_PAGE);
- assert_eq!(second_page.num_values(), 8);
- }
-
#[tokio::test]
async fn test_row_filter() {
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
@@ -964,4 +827,87 @@ mod tests {
// Should only have made 3 requests
assert_eq!(requests.lock().unwrap().len(), 3);
}
+
+ #[tokio::test]
+ async fn test_in_memory_row_group_sparse() {
+ let testdata = arrow::util::test_util::parquet_test_data();
+ let path = format!("{}/alltypes_tiny_pages.parquet", testdata);
+ let data = Bytes::from(std::fs::read(path).unwrap());
+
+ let metadata = parse_metadata(&data).unwrap();
+
+ let offset_index =
+ index_reader::read_pages_locations(&data,
metadata.row_group(0).columns())
+ .expect("reading offset index");
+
+ let mut row_group_meta = metadata.row_group(0).clone();
+ row_group_meta.set_page_offset(offset_index.clone());
+ let metadata =
+ ParquetMetaData::new(metadata.file_metadata().clone(),
vec![row_group_meta]);
+
+ let metadata = Arc::new(metadata);
+
+ let num_rows = metadata.row_group(0).num_rows();
+
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let async_reader = TestReader {
+ data: data.clone(),
+ metadata: metadata.clone(),
+ requests: Default::default(),
+ };
+
+ let requests = async_reader.requests.clone();
+ let schema = Arc::new(
+ parquet_to_arrow_schema(metadata.file_metadata().schema_descr(),
None)
+ .expect("building arrow schema"),
+ );
+
+ let _schema_desc = metadata.file_metadata().schema_descr();
+
+ let projection =
+ ProjectionMask::leaves(metadata.file_metadata().schema_descr(),
vec![0]);
+
+ let reader_factory = ReaderFactory {
+ metadata,
+ schema,
+ input: async_reader,
+ filter: None,
+ };
+
+ let mut skip = true;
+ let mut pages = offset_index[0].iter().peekable();
+
+ // Setup `RowSelection` so that we can skip every other page
+ let mut selectors = vec![];
+ let mut expected_page_requests: Vec<Range<usize>> = vec![];
+ while let Some(page) = pages.next() {
+ let num_rows = if let Some(next_page) = pages.peek() {
+ next_page.first_row_index - page.first_row_index
+ } else {
+ num_rows - page.first_row_index
+ };
+
+ if skip {
+ selectors.push(RowSelector::skip(num_rows as usize));
+ } else {
+ selectors.push(RowSelector::select(num_rows as usize));
+ let start = page.offset as usize;
+ let end = start + page.compressed_page_size as usize;
+ expected_page_requests.push(start..end);
+ }
+ skip = !skip;
+ }
+
+ let selection = RowSelection::from(selectors);
+
+ let (_factory, _reader) = reader_factory
+ .read_row_group(0, Some(selection), projection, 48)
+ .await
+ .expect("reading row group");
+
+ let requests = requests.lock().unwrap();
+
+ assert_eq!(&requests[..], &expected_page_requests)
+ }
}
diff --git a/parquet/src/column/page.rs b/parquet/src/column/page.rs
index 1658797ce..ab2d885a2 100644
--- a/parquet/src/column/page.rs
+++ b/parquet/src/column/page.rs
@@ -195,6 +195,7 @@ impl PageWriteSpec {
}
/// Contains metadata for a page
+#[derive(Clone)]
pub struct PageMetadata {
/// The number of rows in this page
pub num_rows: usize,
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index 33499e742..e3f37fbc6 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -65,7 +65,7 @@ pub fn read_pages_locations<R: ChunkReader>(
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
//read all need data into buffer
- let mut reader = reader.get_read(offset, reader.len() as usize)?;
+ let mut reader = reader.get_read(offset, total_length)?;
let mut data = vec![0; total_length];
reader.read_exact(&mut data)?;