This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ee2818e92 Support peek_next_page and skip_next_page in
InMemoryPageReader (#2407)
ee2818e92 is described below
commit ee2818e9266f2b0b1acb6432af1891cd505ad92c
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Aug 12 17:27:16 2022 +0800
Support peek_next_page and skip_next_page in InMemoryPageReader (#2407)
* Support peek_next_page and skip_next_page in InMemoryPageReader
* fix comment
---
parquet/src/arrow/record_reader/mod.rs | 182 ++++++++++++++++++++++++++++++
parquet/src/util/test_common/page_util.rs | 31 ++++-
2 files changed, 207 insertions(+), 6 deletions(-)
diff --git a/parquet/src/arrow/record_reader/mod.rs
b/parquet/src/arrow/record_reader/mod.rs
index 88d45f3d7..18b4c9e07 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -786,4 +786,186 @@ mod tests {
assert_eq!(record_reader.num_records(), 8);
assert_eq!(record_reader.num_values(), 14);
}
+
+ #[test]
+ fn test_skip_required_records() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ REQUIRED INT32 leaf;
+ }
+ ";
+ let desc = parse_message_type(message_type)
+ .map(|t| SchemaDescriptor::new(Arc::new(t)))
+ .map(|s| s.column(0))
+ .unwrap();
+
+ // Construct record reader
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+ // First page
+
+ // Records data:
+ // test_schema
+ // leaf: 4
+ // test_schema
+ // leaf: 7
+ // test_schema
+ // leaf: 6
+ // test_schema
+ // left: 3
+ // test_schema
+ // left: 2
+ {
+ let values = [4, 7, 6, 3, 2];
+ let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+ pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+ let page = pb.consume();
+
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+ record_reader.set_page_reader(page_reader).unwrap();
+ assert_eq!(2, record_reader.skip_records(2).unwrap());
+ assert_eq!(0, record_reader.num_records());
+ assert_eq!(0, record_reader.num_values());
+ assert_eq!(3, record_reader.read_records(3).unwrap());
+ assert_eq!(3, record_reader.num_records());
+ assert_eq!(3, record_reader.num_values());
+ }
+
+ // Second page
+
+ // Records data:
+ // test_schema
+ // leaf: 8
+ // test_schema
+ // leaf: 9
+ {
+ let values = [8, 9];
+ let mut pb = DataPageBuilderImpl::new(desc, 2, true);
+ pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+ let page = pb.consume();
+
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+ record_reader.set_page_reader(page_reader).unwrap();
+ assert_eq!(2, record_reader.skip_records(10).unwrap());
+ assert_eq!(3, record_reader.num_records());
+ assert_eq!(3, record_reader.num_values());
+ assert_eq!(0, record_reader.read_records(10).unwrap());
+ }
+
+ let mut bb = Int32BufferBuilder::new(3);
+ bb.append_slice(&[6, 3, 2]);
+ let expected_buffer = bb.finish();
+ assert_eq!(expected_buffer, record_reader.consume_record_data());
+ assert_eq!(None, record_reader.consume_def_levels());
+ assert_eq!(None, record_reader.consume_bitmap());
+ }
+
+ #[test]
+ fn test_skip_optional_records() {
+ // Construct column schema
+ let message_type = "
+ message test_schema {
+ OPTIONAL Group test_struct {
+ OPTIONAL INT32 leaf;
+ }
+ }
+ ";
+
+ let desc = parse_message_type(message_type)
+ .map(|t| SchemaDescriptor::new(Arc::new(t)))
+ .map(|s| s.column(0))
+ .unwrap();
+
+ // Construct record reader
+ let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+ // First page
+
+ // Records data:
+ // test_schema
+ // test_struct
+ // test_schema
+ // test_struct
+ // leaf: 7
+ // test_schema
+ // test_schema
+ // test_struct
+ // leaf: 6
+ // test_schema
+ // test_struct
+ // leaf: 6
+ {
+ let values = [7, 6, 3];
+ //empty, non-empty, empty, non-empty, non-empty
+ let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
+ let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+ pb.add_def_levels(2, &def_levels);
+ pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+ let page = pb.consume();
+
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+ record_reader.set_page_reader(page_reader).unwrap();
+ assert_eq!(2, record_reader.skip_records(2).unwrap());
+ assert_eq!(0, record_reader.num_records());
+ assert_eq!(0, record_reader.num_values());
+ assert_eq!(3, record_reader.read_records(3).unwrap());
+ assert_eq!(3, record_reader.num_records());
+ assert_eq!(3, record_reader.num_values());
+ }
+
+ // Second page
+
+ // Records data:
+ // test_schema
+ // test_schema
+ // test_struct
+ // left: 8
+ {
+ let values = [8];
+ //empty, non-empty
+ let def_levels = [0i16, 2i16];
+ let mut pb = DataPageBuilderImpl::new(desc, 2, true);
+ pb.add_def_levels(2, &def_levels);
+ pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+ let page = pb.consume();
+
+ let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+ record_reader.set_page_reader(page_reader).unwrap();
+ assert_eq!(2, record_reader.skip_records(10).unwrap());
+ assert_eq!(3, record_reader.num_records());
+ assert_eq!(3, record_reader.num_values());
+ assert_eq!(0, record_reader.read_records(10).unwrap());
+ }
+
+ // Verify result def levels
+ let mut bb = Int16BufferBuilder::new(7);
+ bb.append_slice(&[0i16, 2i16, 2i16]);
+ let expected_def_levels = bb.finish();
+ assert_eq!(
+ Some(expected_def_levels),
+ record_reader.consume_def_levels()
+ );
+
+ // Verify bitmap
+ let expected_valid = &[false, true, true];
+ let expected_buffer =
Buffer::from_iter(expected_valid.iter().cloned());
+ let expected_bitmap = Bitmap::from(expected_buffer);
+ assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+
+ // Verify result record data
+ let actual = record_reader.consume_record_data();
+ let actual_values = actual.typed_data::<i32>();
+
+ let expected = &[0, 6, 3];
+ assert_eq!(actual_values.len(), expected.len());
+
+ // Only validate valid values are equal
+ let iter = expected_valid.iter().zip(actual_values).zip(expected);
+ for ((valid, actual), expected) in iter {
+ if *valid {
+ assert_eq!(actual, expected)
+ }
+ }
+ }
}
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index dffcb2a44..243fb6f8b 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -24,6 +24,7 @@ use crate::encodings::levels::LevelEncoder;
use crate::errors::Result;
use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
use crate::util::memory::ByteBufferPtr;
+use std::iter::Peekable;
use std::mem;
pub trait DataPageBuilder {
@@ -127,8 +128,8 @@ impl DataPageBuilder for DataPageBuilderImpl {
encoding: self.encoding.unwrap(),
num_nulls: 0, /* set to dummy value - don't need this when
reading
* data page */
- num_rows: self.num_values, /* also don't need this when reading
- * data page */
+ num_rows: self.num_values, /* num_rows only needs in
skip_records, now we not support skip REPEATED field,
+ * so we can assume num_values ==
num_rows */
def_levels_byte_len: self.def_levels_byte_len,
rep_levels_byte_len: self.rep_levels_byte_len,
is_compressed: false,
@@ -149,13 +150,13 @@ impl DataPageBuilder for DataPageBuilderImpl {
/// A utility page reader which stores pages in memory.
pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
- page_iter: P,
+ page_iter: Peekable<P>,
}
impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
Self {
- page_iter: pages.into_iter(),
+ page_iter: pages.into_iter().peekable(),
}
}
}
@@ -166,11 +167,29 @@ impl<P: Iterator<Item = Page> + Send> PageReader for
InMemoryPageReader<P> {
}
fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
- unimplemented!()
+ if let Some(x) = self.page_iter.peek() {
+ match x {
+ Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
+ num_rows: *num_values as usize,
+ is_dict: false,
+ })),
+ Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
+ num_rows: *num_rows as usize,
+ is_dict: false,
+ })),
+ Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
+ num_rows: 0,
+ is_dict: true,
+ })),
+ }
+ } else {
+ Ok(None)
+ }
}
fn skip_next_page(&mut self) -> Result<()> {
- unimplemented!()
+ self.page_iter.next();
+ Ok(())
}
}