This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ee2818e92 Support peek_next_page and skip_next_page in 
InMemoryPageReader (#2407)
ee2818e92 is described below

commit ee2818e9266f2b0b1acb6432af1891cd505ad92c
Author: Yang Jiang <[email protected]>
AuthorDate: Fri Aug 12 17:27:16 2022 +0800

    Support peek_next_page and skip_next_page in InMemoryPageReader (#2407)
    
    * Support peek_next_page and skip_next_page in InMemoryPageReader
    
    * fix comment
---
 parquet/src/arrow/record_reader/mod.rs    | 182 ++++++++++++++++++++++++++++++
 parquet/src/util/test_common/page_util.rs |  31 ++++-
 2 files changed, 207 insertions(+), 6 deletions(-)

diff --git a/parquet/src/arrow/record_reader/mod.rs 
b/parquet/src/arrow/record_reader/mod.rs
index 88d45f3d7..18b4c9e07 100644
--- a/parquet/src/arrow/record_reader/mod.rs
+++ b/parquet/src/arrow/record_reader/mod.rs
@@ -786,4 +786,186 @@ mod tests {
         assert_eq!(record_reader.num_records(), 8);
         assert_eq!(record_reader.num_values(), 14);
     }
+
+    #[test]
+    fn test_skip_required_records() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          REQUIRED INT32 leaf;
+        }
+        ";
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Arc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        // First page
+
+        // Records data:
+        // test_schema
+        //   leaf: 4
+        // test_schema
+        //   leaf: 7
+        // test_schema
+        //   leaf: 6
+        // test_schema
+        //   left: 3
+        // test_schema
+        //   left: 2
+        {
+            let values = [4, 7, 6, 3, 2];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.skip_records(2).unwrap());
+            assert_eq!(0, record_reader.num_records());
+            assert_eq!(0, record_reader.num_values());
+            assert_eq!(3, record_reader.read_records(3).unwrap());
+            assert_eq!(3, record_reader.num_records());
+            assert_eq!(3, record_reader.num_values());
+        }
+
+        // Second page
+
+        // Records data:
+        // test_schema
+        //   leaf: 8
+        // test_schema
+        //   leaf: 9
+        {
+            let values = [8, 9];
+            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.skip_records(10).unwrap());
+            assert_eq!(3, record_reader.num_records());
+            assert_eq!(3, record_reader.num_values());
+            assert_eq!(0, record_reader.read_records(10).unwrap());
+        }
+
+        let mut bb = Int32BufferBuilder::new(3);
+        bb.append_slice(&[6, 3, 2]);
+        let expected_buffer = bb.finish();
+        assert_eq!(expected_buffer, record_reader.consume_record_data());
+        assert_eq!(None, record_reader.consume_def_levels());
+        assert_eq!(None, record_reader.consume_bitmap());
+    }
+
+    #[test]
+    fn test_skip_optional_records() {
+        // Construct column schema
+        let message_type = "
+        message test_schema {
+          OPTIONAL Group test_struct {
+            OPTIONAL INT32 leaf;
+          }
+        }
+        ";
+
+        let desc = parse_message_type(message_type)
+            .map(|t| SchemaDescriptor::new(Arc::new(t)))
+            .map(|s| s.column(0))
+            .unwrap();
+
+        // Construct record reader
+        let mut record_reader = RecordReader::<Int32Type>::new(desc.clone());
+
+        // First page
+
+        // Records data:
+        // test_schema
+        //   test_struct
+        // test_schema
+        //   test_struct
+        //     leaf: 7
+        // test_schema
+        // test_schema
+        //   test_struct
+        //     leaf: 6
+        // test_schema
+        //   test_struct
+        //     leaf: 6
+        {
+            let values = [7, 6, 3];
+            //empty, non-empty, empty, non-empty, non-empty
+            let def_levels = [1i16, 2i16, 0i16, 2i16, 2i16];
+            let mut pb = DataPageBuilderImpl::new(desc.clone(), 5, true);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.skip_records(2).unwrap());
+            assert_eq!(0, record_reader.num_records());
+            assert_eq!(0, record_reader.num_values());
+            assert_eq!(3, record_reader.read_records(3).unwrap());
+            assert_eq!(3, record_reader.num_records());
+            assert_eq!(3, record_reader.num_values());
+        }
+
+        // Second page
+
+        // Records data:
+        // test_schema
+        // test_schema
+        //   test_struct
+        //     left: 8
+        {
+            let values = [8];
+            //empty, non-empty
+            let def_levels = [0i16, 2i16];
+            let mut pb = DataPageBuilderImpl::new(desc, 2, true);
+            pb.add_def_levels(2, &def_levels);
+            pb.add_values::<Int32Type>(Encoding::PLAIN, &values);
+            let page = pb.consume();
+
+            let page_reader = Box::new(InMemoryPageReader::new(vec![page]));
+            record_reader.set_page_reader(page_reader).unwrap();
+            assert_eq!(2, record_reader.skip_records(10).unwrap());
+            assert_eq!(3, record_reader.num_records());
+            assert_eq!(3, record_reader.num_values());
+            assert_eq!(0, record_reader.read_records(10).unwrap());
+        }
+
+        // Verify result def levels
+        let mut bb = Int16BufferBuilder::new(7);
+        bb.append_slice(&[0i16, 2i16, 2i16]);
+        let expected_def_levels = bb.finish();
+        assert_eq!(
+            Some(expected_def_levels),
+            record_reader.consume_def_levels()
+        );
+
+        // Verify bitmap
+        let expected_valid = &[false, true, true];
+        let expected_buffer = 
Buffer::from_iter(expected_valid.iter().cloned());
+        let expected_bitmap = Bitmap::from(expected_buffer);
+        assert_eq!(Some(expected_bitmap), record_reader.consume_bitmap());
+
+        // Verify result record data
+        let actual = record_reader.consume_record_data();
+        let actual_values = actual.typed_data::<i32>();
+
+        let expected = &[0, 6, 3];
+        assert_eq!(actual_values.len(), expected.len());
+
+        // Only validate valid values are equal
+        let iter = expected_valid.iter().zip(actual_values).zip(expected);
+        for ((valid, actual), expected) in iter {
+            if *valid {
+                assert_eq!(actual, expected)
+            }
+        }
+    }
 }
diff --git a/parquet/src/util/test_common/page_util.rs 
b/parquet/src/util/test_common/page_util.rs
index dffcb2a44..243fb6f8b 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -24,6 +24,7 @@ use crate::encodings::levels::LevelEncoder;
 use crate::errors::Result;
 use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
 use crate::util::memory::ByteBufferPtr;
+use std::iter::Peekable;
 use std::mem;
 
 pub trait DataPageBuilder {
@@ -127,8 +128,8 @@ impl DataPageBuilder for DataPageBuilderImpl {
                 encoding: self.encoding.unwrap(),
                 num_nulls: 0, /* set to dummy value - don't need this when 
reading
                                * data page */
-                num_rows: self.num_values, /* also don't need this when reading
-                                            * data page */
+                num_rows: self.num_values, /* num_rows only needs in 
skip_records, now we not support skip REPEATED field,
+                                            * so we can assume num_values == 
num_rows */
                 def_levels_byte_len: self.def_levels_byte_len,
                 rep_levels_byte_len: self.rep_levels_byte_len,
                 is_compressed: false,
@@ -149,13 +150,13 @@ impl DataPageBuilder for DataPageBuilderImpl {
 
 /// A utility page reader which stores pages in memory.
 pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
-    page_iter: P,
+    page_iter: Peekable<P>,
 }
 
 impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
     pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
         Self {
-            page_iter: pages.into_iter(),
+            page_iter: pages.into_iter().peekable(),
         }
     }
 }
@@ -166,11 +167,29 @@ impl<P: Iterator<Item = Page> + Send> PageReader for 
InMemoryPageReader<P> {
     }
 
     fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        unimplemented!()
+        if let Some(x) = self.page_iter.peek() {
+            match x {
+                Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
+                    num_rows: *num_values as usize,
+                    is_dict: false,
+                })),
+                Page::DataPageV2 { num_rows, .. } => Ok(Some(PageMetadata {
+                    num_rows: *num_rows as usize,
+                    is_dict: false,
+                })),
+                Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
+                    num_rows: 0,
+                    is_dict: true,
+                })),
+            }
+        } else {
+            Ok(None)
+        }
     }
 
     fn skip_next_page(&mut self) -> Result<()> {
-        unimplemented!()
+        self.page_iter.next();
+        Ok(())
     }
 }
 

Reply via email to